Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。本篇在此记录一些常用的kafka shell

Kafka版本:kafka_2.11-2.1.0

创建Topic

1
bin/kafka-topics.sh --zookeeper master:2181,slave1:2181,slave2:2181 --create --topic test_topic --partitions 2 --replication-factor 2

说明:

  • --zookeeper也可以只填其中任意一台zookeeper服务
  • --partitions表示创建的分区数,通常用于提高kafka的吞吐量,但也不是多多益善,具体看需求场景
  • --replication-factor 表示每个分区中消息的副本数,通常都设置为broker的数量,具体看需求场景。

列出所有的Topic

1
bin/kafka-topics.sh --zookeeper master:2181 --list

这里我们至指定了一个zookeeper节点。

查看指定的Topic的描述信息

1
bin/kafka-topics.sh --zookeeper master:2181 --describe --topic test_topic

输入如下信息:

1
2
3
Topic:test_topic        PartitionCount:2        ReplicationFactor:2     Configs:
Topic: test_topic Partition: 0 Leader: 11 Replicas: 11,10 Isr: 11,10
Topic: test_topic Partition: 1 Leader: 10 Replicas: 10,11 Isr: 10,11

给指定Topic生产数据

1
bin/kafka-console-producer.sh --broker-list slave1:9092,slave2:9092 --topic test_topic

这里的--broker-list也可以只填写其中一个节点。

消费指定Topic中的数据

1
bin/kafka-console-consumer.sh --bootstrap-server slave1:9092 --topic test_topic --from-beginning
  • --bootstrap-server表示kafka的broker,可以都写全也可以只写一个
  • --from-beginning 表示从开始进行消费,不加则只消费consumer启动后的数据

查看指定Topic中offset的最大(小)值

查看offset的最大值

1
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list slave1:9092,slave2:9092 --topic gpsnj --time -1

此时会输入如下内容:

1
2
test_topic:0:2
test_topic:1:2

这表示test_topic中分区0的offset的最大值为2,分区1的offset的最大值为2,在数据了比较大的情况下,这两个值一般都不一样。

查看offset的最小值

1
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list slave1:9092,slave2:9092 --topic gpsnj --time -2

此时会输入如下内容:

1
2
test_topic:0:0
test_topic:1:0

这表示test_topic中分区0的offset的最小值为2,分区1的offset的最小值为2,在数据了比较大的情况下,这两个值一般都不一样。

查看topic指定分区的offset最大最小值

主需要在上面两个命令中加入--partitions即可

1
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list slave1:9092,slave2:9092 --topic test_topic --time -1 --partitions 0

time为-1时表示最大值,time为-2时表示最小值

增加分区数

1
bin/kafka-topics.sh --zookeeper master:2181 --alter --topic test_topic --partitions 5

这表示将test_topic的分区数修改为5个分区,此时这个topic总共有5个分区,而不是7个,可以按照之前介绍的命令查看一下

1
2
3
4
5
6
Topic:test_topic        PartitionCount:5        ReplicationFactor:2     Configs:
Topic: test_topic Partition: 0 Leader: 11 Replicas: 11,10 Isr: 11,10
Topic: test_topic Partition: 1 Leader: 10 Replicas: 10,11 Isr: 10,11
Topic: test_topic Partition: 2 Leader: 11 Replicas: 11,10 Isr: 11,10
Topic: test_topic Partition: 3 Leader: 10 Replicas: 10,11 Isr: 10,11
Topic: test_topic Partition: 4 Leader: 11 Replicas: 11,10 Isr: 11,10

删除Topic

1
bin/kafka-topics.sh --delete --zookeeper master:2181 --topic test_topic

此时会输入如下内容:

1
2
Topic test_topic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

其实执行完命令后topic并没有被删除,而只是被标记删除。此时查看topic list依旧存在。解决办法有如下两种:

修改kafka配置

在kafka的配置文件中加入如下配置:

1
delete.topic.enable=true

此时,再使用命令即可成功删除。

删除kafka相关的文件

如果没有执行上述的命令,需要如下几个步骤才可以彻底删除topic
登录Zookeeper

1
zookeeper-client 

查看topic

1
ls /brokers/topics

此时会出现如下日志:

1
[syslog_web_1,test_topic]

删除这个节点

1
rmr /brokers/topics/test_topic

删除kafka存储目录
这个路径在server.propertieslog.dirs配置,如果配置了多个路径也都要删除,另外,如果kafka有多个broker,则需要针对每个broker中的数据进行删除,此时test_topic被彻底删除。

如果执行了上述的命令,需要如下几个步骤才可以彻底删除topic
登录到zookeeper

1
zookeeper-client 

找到被标记的topic

1
ls /admin/delete_topics

删除这个节点

1
rmr /admin/delete_topics/test_topic