Kafka支持的基本命令位于${KAFKA_HOME}/bin文件夹中。
kafka-topics.sh(Topic操作)
创建一个主题
bin/kafka-topics.sh --create --zookeeper localhost:2181 --config max.message.bytes=12800000 --config flush.messages=1 --replication-factor 2 --partitions 3 --topic my-replicated-topic
partitions参数:表示主题的分区(partition)的数量。
replication-factor参数:表示每个partition的备份数量。
zookeeper参数:表示zookeeper的主机地址和客户端连接端口。该值和server.properties文件中的配置项{zookeeper.connect}一样。如果zookeeper是集群,这里也可以只写一个节点就行。
flush.messages:这个参数是用来保证消息发送的可靠性,一般情况下不需要设置。在Kafka-Kafka为什么可以做到高吞吐量、低延迟这篇文章里我们讲了Kafka的读写利用了操作系统的Page Cache,默认情况下Kafka是异步持久化数据,也就是说把消息写入到Page Cache之后就被Kakfa认为消息写入成功了,但后面其实还需要把数据从Page Cache中批量写入到磁盘上,而Page Cache本质上就内存缓存,考虑比较极端的情况,如果从内存同步到磁盘的过程中机器宕机了,那么这一部分数据就丢失了。如果将flush.messages设置为1,那么每有一条消息写入到Page Cache系统就会把这条数据持久化到磁盘,配合acks、min.insync.replicas这些配置项,会使消息可靠性得到大幅度得提升,但是flush.messages=1会严重影响性能,可以在部分可靠性要求很高的Topic才进行配置。
max.message.bytes:设置topic的每条消息最大为多少,单位是byte。
上面的创建方式kafka会自动把各个partition的replication分配到相应的broker。
也可以在创建topic时手动指定哪个分区的哪个replication落在指定的broker,示例命令如下:
bin/kafka-topics.sh --create --topic testTopic --zookeeper localhost:2181 --replica-assignment 0:1,1:2
关键配置参数为replica-assignment,该参数不能与–partitions和–replication-factor同时出现,参数的使用格式如下(假设replication数量为2):
broker_id_for_partition0_replication1: broker_id_for_partition0_replication2,
broker_id_for_partition1_replication1: broker_id_for_partition1_replication2,
broker_id_for_partition2_replication1: broker_id_for_partition2_replication2
–replica-assignment 0:1,1:2表示有两个分区,分区0的replica1在broker.id=0的kafka服务上,分区0的replica2在broker.id=1的kafka服务上;分区1的replica1在broker.id=1的kafka服务上,分区1的replica2在broker.id=2的kafka服务上。
列出所有主题
bin/kafka-topics.sh --list --zookeeper localhost:2181
示例:
1 | [root@server01 kafka_2.11-1.0.0]# bin/kafka-topics.sh --list --zookeeper localhost:2181 |
注意:
__consumer_offsets 是一个特殊的主题,它不是有外部生产者创建的。
因为Zookeeper并不适合大批量的频繁写入操作,新版Kafka已推荐将consumer的消费的位移信息保存在Kafka内部的topic中,即 __consumer_offsets这个topic,并且默认提供了kafka_consumer_groups.sh脚本供用户查看consumer信息。
查看某个主题(topic)的具体信息:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
示例:
1 | [root@server01 kafka_2.11-1.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic |
修改topic信息
1 | bin/kafka-topics.sh --zookeeper 192.168.187.146:2181 --alter --topic test0 --config max.message.bytes=128000 |
删除topic
bin/kafka-topics.sh --delete --topic testTopic --zookeeper localhost:2181
Note: This will have no impact if delete.topic.enable is not set to true.
默认情况下,删除是标记删除,没有实际删除这个Topic。
如果要真正删除Topic,有两种方式:
- 方式一:通过delete命令删除后,手动将本地磁盘以及zk上的相关topic的信息删除即可
- 方式二:配置server.properties文件,给定参数delete.topic.enable=true,重启kafka服务,此时执行delete命令表示真正进行了Topic的删除
kafka-consumer-groups.sh(消费组管理)
可以列出所有消费者组,查看某个消费者组的详细情况以及删除消费者组的信息(删除只适用于旧版本基于zookeeper的消费者组)。
列出消费者组
先用kafka-console-consumer.sh启动一个消费者,并加入一个叫做“test_group”的组:
bin/kafka-console-consumer.sh --bootstrap-server 192.168.100.1:9092 --topic testTopic --consumer-property group.id=test_group
接着使用以下命令列出所有的消费都组:
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.100.1:9092 --list
1 | [root@server01 kafka_2.11-1.0.0]# bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list |
注意:老版本的kafka消费组的信息是保存在zookeeper上的,需要用bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --list
这个命令。
从kafka-0.9版本及以后,kafka的消费者组和offset信息就不存zookeeper了,而是存到broker服务器上,所以使用bootstrap-server来查看。
查看某个消费者组
查看消费者组的具体消费状况,结合来分析目前集群的稳健程度,执行以下命令:
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.100.1:9092 --describe --group test_group
1 | [root@server01 kafka_2.11-1.0.0]# bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test_group |
- CURRENT-OFFSET:当前已消费条数。
- LOG-END-OFFSET:消息总条数。
- LAG:剩余消费条数。
- CONSUMER-ID:消费组的id。
- HOST:消息消费者的ip。
- CLIENT-ID:客户端的id。
上面显示,这个消费者组订阅了myTest和testTopic两个Topic,其中myTest是有两个partition。
执行上面命令的时候,如果把kafka服务器的ip改成用localhost代替,即使用bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test_group
命令,会出现如下报错:
1 | WARN [Producer clientId=console-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) |
这是应为在配置文件server.properties
中配置的hostname不是localhost,而是使用ip的方式,所以执行kafka命令的时候需要保持一致。
v1.5.2