Kafka单机、集群模式安装详解

2017/06/08 分布式
本文环境如下: 
操作系统:Mac Os
JDK版本:1.7.0_77 64位 
Kafka版本:2.10

下载、解压Kafka安装包

Kafka官网地址: http://kafka.apache.org/

Kafka目录介绍

  • /bin 操作kafka的可执行脚本,还包含windows下脚本
  • /config 配置文件所在目录
  • /libs 依赖库目录
  • /logs 日志数据目录,目录kafka把server端日志分为5种类型,分为:server,request,state,log-cleaner,controller

启动单机模式

  1. 修改配置文件 进入kafka安装工程根目录编辑config/server.properties

kafka最为重要三个配置依次为:broker.id、log.dir、zookeeper.connect,kafka server端config/server.properties参数说明和解释如下:

cd /opt/kafka_2.11-0.9.0.1/config
vi server.properties

修改配置文件中的以下内容:

broker.id=0        //为依次增长的:0、1、2、3、4,集群中唯一id
log.dirs=/opt/kafka_2.11-0.9.0.1/logs    //日志地址
zookeeper.connect=master:2181,slave1:2181,slave2:2181 //zookeeperServers列表,各节点以逗号分开

启动程序

先要确保zookeeper已启动,然后在Kafka目录执行

nohup bin/kafka-server-start.sh config/server.properties&

如果无报错则说明启动成功。nohup &是实现在后台启动。

  • 检测2181与9092端口
    netstat -tunlp|egrep "(2181|9092)"
    tcp        0      0 :::2181       :::*    LISTEN      19787/java          
    tcp        0      0 :::9092       :::*    LISTEN      28094/java 
    

    说明: Kafka的进程ID为28094,占用端口为9092

QuorumPeerMain为对应的zookeeper实例,进程ID为19787,在2181端口监听

简单测试

打开2个终端,分别在Kafka目录执行以下命令

  • 启动producer
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    
  • 启动consumer
    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
    

    在producer的命令行输入任意字符,观察consumer是否能正确接收。

  • 注意: producer,指定的Socket(localhost:9092),说明生产者的消息要发往kafka,也即是broker
    consumer, 指定的Socket(localhost:2181),说明消费者的消息来自zookeeper(协调转发)

上面的只是一个单个的broker,下面我们来实验一个多broker的集群。

搭建一个多个broker的伪集群(配置三个kafka)

刚才只是启动了单个broker,现在启动有3个broker组成的集群,这些broker节点也都是在本机上。

(1)为每一个broker提供配置文件

我们先看看config/server0.properties配置信息:

broker.id=0
listeners=PLAINTEXT://:9092
port=9092
host.name=192.168.1.181
num.network.threads=4
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=5
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=192.168.1.181:2181
zookeeper.connection.timeout.ms=6000
queued.max.requests =500
log.cleanup.policy = delete
  • 说明:

broker.id为集群中唯一的标注一个节点,因为在同一个机器上,所以必须指定不同的端口和日志文件,避免数据被覆盖。

在上面单个broker的实验中,为什么kafka的端口为9092,这里可以看得很清楚。

kafka cluster怎么同zookeeper交互的,配置信息中也有体现。

那么下面,我们仿照上面的配置文件,提供2个broker的配置文件:

  • server1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    port=9093
    host.name=192.168.1.181
    num.network.threads=4
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/tmp/kafka-logs1
    num.partitions=5
    num.recovery.threads.per.data.dir=1
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    log.cleaner.enable=false
    zookeeper.connect=192.168.1.181:2181
    zookeeper.connection.timeout.ms=6000
    queued.max.requests =500
    log.cleanup.policy = delete
    
  • server2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    port=9094
    host.name=192.168.1.181
    num.network.threads=4
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/tmp/kafka-logs2
    num.partitions=5
    num.recovery.threads.per.data.dir=1
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    log.cleaner.enable=false
    zookeeper.connect=192.168.1.181:2181
    zookeeper.connection.timeout.ms=6000
    queued.max.requests =500
    log.cleanup.policy = delete
    

    启动所有的broker

命令如下:

bin/kafka-server-start.sh config/server0.properties &   #启动broker0
bin/kafka-server-start.sh config/server1.properties & #启动broker1
bin/kafka-server-start.sh config/server2.properties & #启动broker2

查看2181、9092、9093、9094端口

netstat -tunlp|egrep "(2181|9092|9093|9094)"
tcp        0      0 :::9093   :::*       LISTEN      29725/java          
tcp        0      0 :::2181   :::*       LISTEN      19787/java          
tcp        0      0 :::9094   :::*       LISTEN      29800/java          
tcp        0      0 :::9092   :::*       LISTEN      29572/java  

一个zookeeper在2181端口上监听,3个kafka cluster(broker)分别在端口9092,9093,9094监听。

创建topic

bin/kafka-topics.sh --create --topic topic_1 --partitions 1 --replication-factor 3  \--zookeeper localhost:2181
bin/kafka-topics.sh --create --topic topic_2 --partitions 1 --replication-factor 3  \--zookeeper localhost:2181
bin/kafka-topics.sh --create --topic topic_3 --partitions 1 --replication-factor 3  \--zookeeper localhost:2181

查看topic创建情况:

bin/kafka-topics.sh --list --zookeeper localhost:2181
test
topic_1
topic_2
topic_3
[root@atman081 kafka_2.10-0.9.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181
Topic:test	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
Topic:topic_1	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: topic_1	Partition: 0	Leader: 2	Replicas: 2,1,0	Isr: 2,1,0
Topic:topic_2	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: topic_2	Partition: 0	Leader: 1	Replicas: 1,2,0	Isr: 1,2,0
Topic:topic_3	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: topic_3	Partition: 0	Leader: 0	Replicas: 0,2,1	Isr: 0,2,1

上面的有些东西,也许还不太清楚,暂放,继续试验。需要注意的是topic_1的Leader=2

模拟客户端发送,接受消息

  • 发送消息
    bin/kafka-console-producer.sh --topic topic_1 --broker-list 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
    

    接收消息

    bin/kafka-console-consumer.sh --topic topic_1 --zookeeper 127.0.0.1:2181 --from-beginning
    

    需要注意,此时producer将topic发布到了3个broker中,现在就有点分布式的概念了。

kill some broker

kill broker(id=0)

首先,我们根据前面的配置,得到broker(id=0)应该在9092监听,这样就能确定它的PID了。

broker0没kill之前topic在kafka cluster中的情况

bin/kafka-topics.sh --describe --zookeeper localhost:2181
Topic:test	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
Topic:topic_1	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: topic_1	Partition: 0	Leader: 2	Replicas: 2,1,0	Isr: 2,1,0
Topic:topic_2	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: topic_2	Partition: 0	Leader: 1	Replicas: 1,2,0	Isr: 1,2,0
Topic:topic_3	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: topic_3	Partition: 0	Leader: 2	Replicas: 0,2,1	Isr: 2,1,0

kill之后,再观察,做下对比。很明显,主要变化在于Isr,以后再分析

bin/kafka-topics.sh --describe --zookeeper localhost:2181
Topic:test	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: test	Partition: 0	Leader: -1	Replicas: 0	Isr: 
Topic:topic_1	PartitionCount:1	ReplicationFactor:3	Configs:

	Topic: topic_1	Partition: 0	Leader: 2	Replicas: 2,1,0	Isr: 2,1
Topic:topic_2	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: topic_2	Partition: 0	Leader: 1	Replicas: 1,2,0	Isr: 1,2
Topic:topic_3	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: topic_3	Partition: 0	Leader: 2	Replicas: 0,2,1	Isr: 2,1

测试下,发送消息,接受消息,是否收到影响。

发送消息

bin/kafka-console-producer.sh --topic topic_1 --broker-list 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094

接收消息

bin/kafka-console-consumer.sh –topic topic_1 –zookeeper 127.0.0.1:2181 –from-beginning 可见,kafka的分布式机制,容错能力还是挺好的~

Kafka介绍

  1. kafka有什么?
  • producer 消息的生成者,即发布消息

  • consumer 消息的消费者,即订阅消息

  • broker Kafka以集群的方式运行,可以由一个或多个服务组成,服务即broker

  • zookeeper 协调转发

原文链接 原文链接

Show Disqus Comments

Search

    Post Directory