欢迎访问昆山宝鼎软件有限公司网站! 设为首页 | 网站地图 | XML | RSS订阅 | 宝鼎邮箱 | 后台管理


新闻资讯

MENU

软件开发知识
原文出处: Valleylord

Kafka 副本和集群

在出产情况中,Kafka 老是以“集群+分区”方法运行的,以担保靠得住性和机能。下面是一个3副本的 Kafka 集群实例。

首先,需要启动3个 Kafka Broker,Broker 的设置文件别离如下,

broker.id=0
listeners=PLAINTEXT://192.168.232.23:9092
log.dirs=/tmp/kafka-logs
broker.id=1
listeners=PLAINTEXT://192.168.232.23:9093
log.dirs=/tmp/kafka-logs-1
broker.id=1
listeners=PLAINTEXT://192.168.232.23:9094
log.dirs=/tmp/kafka-logs-2

固然每个 Broker 只设置了一个端口,实际上,Kafka 会多占用一个,大概是用来 Broker 之间的复制的。别的,3个 Broker 都设置了,

zookeeper.connect=localhost:2181
delete.topic.enable=true

在同一个 Zookeeper 上的 Broker 会被归类到一个集群中。留意,这些设置中并没有指定哪一个 Broker 是主节点,哪些 Broker 是从节点,Kafka 回收的步伐是从可选的 Broker 中,选出每个分区的 Leader。也就是说,对某个 Topic 来说,大概0节点是 Leader,别的一些 Topic,大概1节点是 Leader;甚至,假如 topic1 有2个分区的话,分区1的 Leader 是0节点,分区2的 Leader 是1节点。

这种对等的设计,对付妨碍规复是十分有用的,在节点瓦解的时候,Kafka 会自动选举出可用的从节点,将其进级为主节点。在瓦解的节点规复,插手集群之后,Kafka 又会将这个节点插手到可用节点,并自动选举出新的主节点。

尝试如下,先新建一个3副本,2分区的 Topic,

bin/kafka-topics.sh --create --zookeeper 192.168.232.23:2181 --replication-factor 3 --partitions 2 --topic topic1

初始状况下,topic1 的状态如下,

$ bin/kafka-topics.sh --describe --zookeeper 192.168.232.23:2181 --topic topic1
Topic:topic1    PartitionCount:2        ReplicationFactor:3     Configs:
        Topic: topic1   Partition: 0    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2
        Topic: topic1   Partition: 1    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0

对付上面的输出,纵然没有文档,也可以看懂或许:topic1 有2个分区,Partition 0 和 Partition 1,Leader 别离在 Broker 0 和 1。Replicas 暗示副本在哪些 Broker 上,Isr(In-Sync Replicas)暗示处于同步状态中的 Broker,假如有 Broker 宕机了,那么 Replicas 不会变,可是 Isr 会仅显示没有宕机的 Broker,详见下面的尝试。

然后分2个线程,运行之前写的 Producer 和 Consumer 的示例代码,Producer 回收异步发送,动静回收同步复制。在有动静传送的环境下,kill -9 停掉个中2个 Broker(Broker 0 和 Broker 1),模仿溘然宕机。此时,topic1 状态如下,

$ bin/kafka-topics.sh --describe --zookeeper 192.168.232.23:2181 --topic topic1
Topic:topic1    PartitionCount:2        ReplicationFactor:3     Configs:
        Topic: topic1   Partition: 0    Leader: 2       Replicas: 0,1,2 Isr: 2
        Topic: topic1   Partition: 1    Leader: 2       Replicas: 1,2,0 Isr: 2

可见,Kafka 已经选出了新的 Leader,动静传送没有间断。接着再启动被停掉的那两个 Broker,并查察 topic1 的状态,如下,

$ bin/kafka-topics.sh --describe --zookeeper 192.168.232.23:2181 --topic topic1
Topic:topic1    PartitionCount:2        ReplicationFactor:3     Configs:
        Topic: topic1   Partition: 0    Leader: 2       Replicas: 0,1,2 Isr: 2,1,0
        Topic: topic1   Partition: 1    Leader: 2       Replicas: 1,2,0 Isr: 2,1,0
$ bin/kafka-topics.sh --describe --zookeeper 192.168.232.23:2181 --topic topic1
Topic:topic1    PartitionCount:2        ReplicationFactor:3     Configs:
        Topic: topic1   Partition: 0    Leader: 2       Replicas: 0,1,2 Isr: 2,1,0
        Topic: topic1   Partition: 1    Leader: 1       Replicas: 1,2,0 Isr: 2,1,0

可以发明, 有一个短暂的时间,topic1 的两个分区的 Leader 都是 Broker 2,可是在 Kafka 从头选举之后,分区1的 Leader 变为 Broker 1。说明 Kafka 倾向于用差异的 Broker 做分区的 Leader,这样更能到达负载平衡的结果。

再来看看 Producer 和 Consumer 的日志,下面这个片断是2个 Broker 宕机前后的日志,

......
Send     message: (00439, Message_00439) at offset 217 to partition(0) in 3 ms
Received message: (00438, Message_00438) at offset 216
Send     message: (00440, Message_00440) at offset 218 to partition(0) in 5 ms
Send     message: (00441, Message_00441) at offset 221 to partition(1) in 5 ms
Received message: (00441, Message_00441) at offset 221
Received message: (00439, Message_00439) at offset 217
Send     message: (00442, Message_00442) at offset 222 to partition(1) in 5 ms
Send     message: (00443, Message_00443) at offset 219 to partition(0) in 3 ms
Received message: (00440, Message_00440) at offset 218
Received message: (00443, Message_00443) at offset 219
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
Received message: (00442, Message_00442) at offset 222
Send     message: (00452, Message_00452) at offset 223 to partition(1) in 7492 ms
Send     message: (00454, Message_00454) at offset 224 to partition(1) in 7485 ms
Send     message: (00455, Message_00455) at offset 225 to partition(1) in 7482 ms
Send     message: (00458, Message_00458) at offset 226 to partition(1) in 7473 ms
Send     message: (00460, Message_00460) at offset 227 to partition(1) in 7467 ms
Send     message: (00461, Message_00461) at offset 228 to partition(1) in 7465 ms
Send     message: (00462, Message_00462) at offset 229 to partition(1) in 7462 ms
Send     message: (00463, Message_00463) at offset 230 to partition(1) in 7459 ms
Send     message: (00464, Message_00464) at offset 231 to partition(1) in 7456 ms
Send     message: (00465, Message_00465) at offset 232 to partition(1) in 7453 ms
......
Send     message: (01103, Message_01103) at offset 543 to partition(1) in 5478 ms
Received message: (00631, Message_00631) at offset 310
Received message: (00633, Message_00633) at offset 311
Send     message: (00451, Message_00451) at offset 220 to partition(0) in 7525 ms
Received message: (00634, Message_00634) at offset 312
Send     message: (00453, Message_00453) at offset 221 to partition(0) in 7518 ms
Received message: (00639, Message_00639) at offset 313
Send     message: (00456, Message_00456) at offset 222 to partition(0) in 7509 ms
Received message: (00641, Message_00641) at offset 314
Send     message: (00457, Message_00457) at offset 223 to partition(0) in 7506 ms
Received message: (00643, Message_00643) at offset 315
......