Kafka-安装和使用(Quickstart)

Step1:下载Kafka

官网下载地址下载最新版本并解压:

tar -xzf kafka_2.11-1.1.0.tgz
cd kafka_2.11-1.1.0

Step2:配置

  1. ZooKeeper的配置,参考前面的ZooKeeper的文章,此处不详细做说明。
  2. Kafka的配置
    在Kafka的home目录下,vim config/server.properties命令,最主要的是设置ZooKeeper连接的地址和接口:
    zookeeper.connect=localhost:2181

Step3:启动服务

Kafka用到了ZooKeeper,所以要先启动ZooKeepr。
第一种方式(推荐)是启动独立的ZooKeeper程序,在$ZOOKEEPER_HOME/bin目录下,执行./zkServer.sh start
第二种方式是启动Kafka内置的一个单节点ZooKeeper实例,方法是在$KAFKA_HOME目录下执行:

bin/zookeeper-server-start.sh config/zookeeper.properties

启动了ZooKeeper之后,就可以启动Kafka服务了,在$KAFKA_HOME目录下执行:

bin/kafka-server-start.sh config/server.properties

如果要以守护线程启动,则执行:

bin/kafka-server-start.sh -daemon config/server.properties

Step4:创建一个topic

创建一个叫做testTopic的topic,并且它只有一个分区,一个副本。

1
2
[root@server01 kafka_2.11-1.0.0]# bin/kafka-topics.sh --create --topic testTopic --zookeeper localhost:2181 --replication-factor 1 --partitions 1
Created topic "testTopic".

partitions参数:表示主题的分区(partition)的数量。
replication-factor参数:表示每个partition的备份数量。
zookeeper参数:表示zookeeper的主机地址和客户端连接端口。如果zookeeper是集群,这里也可以只写一个节点就行。

列出Kafka上的所有topic:

1
2
3
[root@server01 kafka_2.11-1.0.0]# bin/kafka-topics.sh --list --zookeeper localhost:2181
myTest
testTopic

上面说明我的kafka已经创建了两个主题:myTest和testTopic。

Step5:发送消息

Kafka 使用一个简单的命令行客户端producer,可以从文件中或者从标准输入中读取消息并发送到Kafka服务端集群。默认的每一行都将作为一条独立的消息

运行producer并在控制台中输一些消息,这些消息将被发送到服务端:

[root@myServer kafka_2.11-1.0.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testTopic
>This is a message
>This is another message

Step6:启动consumer

Kafka也有一个命令行consumer可以读取消息并输出到标准输出:

[root@myServer kafka_2.11-1.0.0]#bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic testTopic --from-beginning
This is a message
This is another message

如果你在一个终端中运行consumer命令行,另一个终端中运行producer命令行,就可以在一个终端输入消息,另一个终端读取消息。
这两个命令都有自己的可选参数,可以在运行的时候不加任何参数可以看到帮助信息。

Step7:搭建一个多个broker的集群

刚才只是启动了单个broker,现在启动有3个broker组成的集群,这些broker节点也都是在本机上的:
首先为每个节点编写配置文件:

> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

在拷贝出的新文件中修改如下3个参数:

config/server-1.properties:

1
2
3
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1

config/server-2.properties:

1
2
3
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2

broker.id在集群中唯一的标注一个节点,因为在同一个机器上,所以必须制定不同的端口和日志文件,避免数据被覆盖。
刚才已经启动可Zookeeper和一个节点,现在启动另外两个Kafka节点:

1
2
3
4
> bin/kafka-server-start.sh  -daemon config/server-1.properties 
...
> bin/kafka-server-start.sh -daemon config/server-2.properties
...

查看进程:

1
2
3
4
5
6
[root@server01 kafka_2.11-1.0.0]# jps -lm
9857 sun.tools.jps.Jps -lm
1737 org.apache.zookeeper.server.quorum.QuorumPeerMain /home/hadoop/app/zookeeper-3.4.11/bin/../conf/zoo.cfg
9517 kafka.Kafka config/server-2.properties
5566 kafka.Kafka ../config/server.properties
9199 kafka.Kafka config/server-1.properties

创建一个拥有3个副本的topic:

1
2
[root@server01 kafka_2.11-1.0.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic my-replicated-topic
Created topic "my-replicated-topic".

现在我们搭建了一个集群,怎么知道每个节点的信息呢?运行“”describe topics”命令就可以了:

1
2
3
4
5
[root@server01 kafka_2.11-1.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:3 ReplicationFactor:2 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: my-replicated-topic Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: my-replicated-topic Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2

下面解释一下这些输出。第一行是对所有分区的一个描述,然后每个分区都会对应一行,我们创建topic的时候指明是3个分区,所以有3行。
leader:负责处理消息的读和写的broker,leader是从所有节点中随机选择的.
replicas:列出了所有的副本节点,不管节点是否在服务中.
isr:是正在服务中的节点.

向topic发送消息:

1
2
3
4
5
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C

消费这些消息:

1
2
3
4
5
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 2
my test message 1
^C

这里接收到的顺序不一致时因为发送到了不同的partiton所致。

测试一下kafka集群的容错能力。对于partition=1,其leader是brokerid=0的节点,现在我们kill掉它:

1
2
3
> ps -aux | grep server.properties
root 32658 0.5 17.4 4305700 329028 pts/0 Sl+ 10:06 0:15 /opt/jdk1.8.0_121/bin/java...
> kill -9 32658

再次查询该topic,可以看到另外一个brokerid=1的节点被选做了leader,brokeid=0的节点不再出现在 in-sync 副本列表中:

1
2
3
4
5
[root@server01 kafka_2.11-1.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:3 ReplicationFactor:2 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0 Isr: 2
Topic: my-replicated-topic Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1
Topic: my-replicated-topic Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2

虽然最初负责写消息的leader down掉了,但之前的消息还是可以消费的,不过bootstrap-server参数要选择没有down掉的节点:

1
2
3
4
5
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --from-beginning --topic my-replicated-topic
...
my test message 2
my test message 1
^C

------ 本文完 ------