Kafka-常用命令

Kafka支持的基本命令位于${KAFKA_HOME}/bin文件夹中。

kafka-topics.sh(Topic操作)

创建一个主题

bin/kafka-topics.sh --create --zookeeper localhost:2181 --config max.message.bytes=12800000 --config flush.messages=1 --replication-factor 2 --partitions 3 --topic my-replicated-topic

partitions参数:表示主题的分区(partition)的数量。
replication-factor参数:表示每个partition的备份数量。
zookeeper参数:表示zookeeper的主机地址和客户端连接端口。该值和server.properties文件中的配置项{zookeeper.connect}一样。如果zookeeper是集群,这里也可以只写一个节点就行。
flush.messages:这个参数是用来保证消息发送的可靠性,一般情况下不需要设置。在Kafka-Kafka为什么可以做到高吞吐量、低延迟这篇文章里我们讲了Kafka的读写利用了操作系统的Page Cache,默认情况下Kafka是异步持久化数据,也就是说把消息写入到Page Cache之后就被Kakfa认为消息写入成功了,但后面其实还需要把数据从Page Cache中批量写入到磁盘上,而Page Cache本质上就内存缓存,考虑比较极端的情况,如果从内存同步到磁盘的过程中机器宕机了,那么这一部分数据就丢失了。如果将flush.messages设置为1,那么每有一条消息写入到Page Cache系统就会把这条数据持久化到磁盘,配合acks、min.insync.replicas这些配置项,会使消息可靠性得到大幅度得提升,但是flush.messages=1会严重影响性能,可以在部分可靠性要求很高的Topic才进行配置。
max.message.bytes:设置topic的每条消息最大为多少,单位是byte。

上面的创建方式kafka会自动把各个partition的replication分配到相应的broker。
也可以在创建topic时手动指定哪个分区的哪个replication落在指定的broker,示例命令如下:

bin/kafka-topics.sh  --create --topic testTopic --zookeeper localhost:2181  --replica-assignment 0:1,1:2

关键配置参数为replica-assignment,该参数不能与–partitions和–replication-factor同时出现,参数的使用格式如下(假设replication数量为2):
broker_id_for_partition0_replication1: broker_id_for_partition0_replication2,
broker_id_for_partition1_replication1: broker_id_for_partition1_replication2,
broker_id_for_partition2_replication1: broker_id_for_partition2_replication2

–replica-assignment 0:1,1:2表示有两个分区,分区0的replica1在broker.id=0的kafka服务上,分区0的replica2在broker.id=1的kafka服务上;分区1的replica1在broker.id=1的kafka服务上,分区1的replica2在broker.id=2的kafka服务上。

列出所有主题

bin/kafka-topics.sh --list --zookeeper localhost:2181

示例:

1
2
3
4
5
[root@server01 kafka_2.11-1.0.0]# bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
my-replicated-topic
myTest
testTopic

注意:
__consumer_offsets 是一个特殊的主题,它不是有外部生产者创建的。
因为Zookeeper并不适合大批量的频繁写入操作,新版Kafka已推荐将consumer的消费的位移信息保存在Kafka内部的topic中,即 __consumer_offsets这个topic,并且默认提供了kafka_consumer_groups.sh脚本供用户查看consumer信息。

查看某个主题(topic)的具体信息:

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

示例:

1
2
3
4
5
[root@server01 kafka_2.11-1.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:3 ReplicationFactor:2 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: my-replicated-topic Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: my-replicated-topic Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2

修改topic信息

1
2
3
4
bin/kafka-topics.sh --zookeeper 192.168.187.146:2181 --alter --topic test0 --config max.message.bytes=128000
bin/kafka-topics.sh --zookeeper 192.168.187.146:2181 --alter --topic test0 --delete-config max.message.bytes
bin/kafka-topics.sh --zookeeper 192.168.187.146:2181 --alter --topic test0 --partitions 10
bin/kafka-topics.sh --zookeeper 192.168.187.146:2181 --alter --topic test0 --partitions 3 ## Kafka分区数量只允许增加,不允许减少

删除topic

bin/kafka-topics.sh --delete --topic testTopic --zookeeper localhost:2181

Note: This will have no impact if delete.topic.enable is not set to true.
默认情况下,删除是标记删除,没有实际删除这个Topic。

如果要真正删除Topic,有两种方式:

  • 方式一:通过delete命令删除后,手动将本地磁盘以及zk上的相关topic的信息删除即可
  • 方式二:配置server.properties文件,给定参数delete.topic.enable=true,重启kafka服务,此时执行delete命令表示真正进行了Topic的删除

kafka-consumer-groups.sh(消费组管理)

可以列出所有消费者组,查看某个消费者组的详细情况以及删除消费者组的信息(删除只适用于旧版本基于zookeeper的消费者组)。

列出消费者组

先用kafka-console-consumer.sh启动一个消费者,并加入一个叫做“test_group”的组:

bin/kafka-console-consumer.sh --bootstrap-server 192.168.100.1:9092 --topic testTopic --consumer-property group.id=test_group

接着使用以下命令列出所有的消费都组:

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.100.1:9092 --list
1
2
3
4
5
[root@server01 kafka_2.11-1.0.0]# bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
Note: This will not show information about old Zookeeper-based consumers.

test_group
console-consumer-77984

注意:老版本的kafka消费组的信息是保存在zookeeper上的,需要用bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --list这个命令。
从kafka-0.9版本及以后,kafka的消费者组和offset信息就不存zookeeper了,而是存到broker服务器上,所以使用bootstrap-server来查看。

查看某个消费者组

查看消费者组的具体消费状况,结合来分析目前集群的稳健程度,执行以下命令:

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.100.1:9092 --describe --group test_group
1
2
3
4
5
6
7
8
[root@server01 kafka_2.11-1.0.0]# bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test_group
Note: This will not show information about old Zookeeper-based consumers.


TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
myTest 0 0 0 0 consumer-1-843dd48a-168b-4d11-b4cf-65e44bcc4d5c /192.168.128.101 consumer-1
myTest 1 0 0 0 consumer-1-843dd48a-168b-4d11-b4cf-65e44bcc4d5c /192.168.128.101 consumer-1
testTopic 0 5 5 0 - - -
  • CURRENT-OFFSET:当前已消费条数。
  • LOG-END-OFFSET:消息总条数。
  • LAG:剩余消费条数。
  • CONSUMER-ID:消费组的id。
  • HOST:消息消费者的ip。
  • CLIENT-ID:客户端的id。

上面显示,这个消费者组订阅了myTest和testTopic两个Topic,其中myTest是有两个partition。

执行上面命令的时候,如果把kafka服务器的ip改成用localhost代替,即使用bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test_group命令,会出现如下报错:

1
WARN [Producer clientId=console-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

这是应为在配置文件server.properties中配置的hostname不是localhost,而是使用ip的方式,所以执行kafka命令的时候需要保持一致。

------ 本文完 ------