发布于

Kafka 是如何实现负载均衡的?

作者

Kafka 的生产者 / 消费者模型

在传统的消息中间件模型中,生产者(Producer)负责将消息发送到特定的主题(Topic)中,并交由 Broker 进行存储与转发,所生产的消息最终交由主题的订阅者(Subscriber)进行处理。在 Kafka 中,订阅者由消费者组(Consumer Group)实现,每个消费者组中包含一个或多个消费者(Consumer),此时 Topic 只是逻辑上的概念,具体的消息实体是由多个分区(Partition)组成的日志存储结构。如下图所示是三个 Kafka Broker 组成的集群,包括三个主题 topic1、topic2 和 topic3,其中每个 topic 包含三个分区,被 Kafka 尽可能的均匀分配到了每个 Broker 中。

Kafka生产者和消费者模型

Kafka 规定了每个消费者必须也只能从属于唯一一个消费者组,与此对应的,生产者在生产消息时需要将消息按照一定策略(轮询、随机等)投递到主题的某个分区中。Kafka 通过对主题进行分区实现了消息在主题层面的 1 : M 生产(一个生产者可以将消息投递到 Topic 的任意一个分区中),与此同时以消费者组为粒度对分区进行尽可能的均匀分配,并限制单个分区只能由消费者组中的单个消费者进行消费,进而实现了消息在消费者组层面的 1 : N 消费(一个消费者组可以同时消费多个分区)。不难发现,通过引入分区与消费者组的概念,Kafka 实现了一个非常灵活的 M : N 生产 / 消费模型。对于任意的主题,我们只需调整生产者的分区策略与消费者组的消费者数量,就能够实现发布 / 订阅模型(M : N)与点对点模型(1 : 1)。

每个主题的分区数可以在创建主题时手动指定,若不指定则由 Kafka 配置文件中的配置参数 num.partitions 决定(默认为 1)。主题的分区数一旦确定便只可增不可减。不难理解,Kafka 采用不允许缩减分区数量的设计一方面是为了保障数据的一致性和完整性,另一方面也是为了架构设计和实现上的简单高效(Kafka 的计算和存储并未分离,Broker 该如何处理被剔除的分区中存储的消息是个非常复杂的问题)。

同时,为了实现高可用和水平扩容,Kafka 为每个分区引入了分区副本(Partition Replica)的概念,每个分区副本分为主副本(Leader Replica)和从副本(Follower Replica),一般情况下,主分区副本负责向外提供消息读写服务,从分区副本只负责提供数据冗余,并定期异步的向主分区副本拉取最新数据进行同步,不对外提供服务。

Kafka 称分区的所有副本(包括主副本)为 AR(Assigned Replica),与主分区保持一定程度同步(可以配置 replica.lag.time.max.ms 来设置可容忍的从分区副本同步滞后时间,默认为 10 秒)的副本称为 ISR(In-Sync Replica)。我们可以通过 Kafka 提供的 kafka-topics.sh 脚本获取当前 Kafka 集群中某个主题的分区副本分配情况与当前状态:

> kafka-topics.sh --zookeeper localhost:10086/kafka --describe --topic topic-1
Topic:topic-1     PartitionCount:3      ReplicationFactor:3
Topic: topic-1    Partition: 0    Leader: 1    Replicas: 1,2,3    Isr: 1,2,3
Topic: topic-1    Partition: 1    leader: 2    Replicas: 2,1,3    Isr: 2,1,3
Topic: topic-1    Partition: 2    leader: 3    Replicas: 3,1,2    Isr: 3,1,2

若在某一时刻主分区副本不可用,则会在 ISR 中选举出一个分区作为新的主分区副本,进而实现故障转移,保障集群高可用。主题的分区副本数可以在创建主题时指定或采用默认的 default.replication.factor(默认为 1)。需要注意的是,分区副本的数量不能超过 Kafka 集群中可用的 Broker 数量,否则无法创建主题。

在引入分区副本后,消费者在消费消息前会先通过 ZooKeeper 中存储的信息定位待消费分区的主分区副本所在的 Kafka Broker,之后再向对应 Broker 拉取消息。同样的,生产者在生产消息时会通过特定的消息分配策略将消息分配到主题的某个主分区副本中。

Kafka 通过在生产者 / 消费者模型中引入消费者组与分区副本的设计,构建了一个高度灵活且可用的消息系统。但是,灵活也意味着复杂,分区副本的分配是否均匀、公平在很大程度上影响了 Kafka 集群中各 Broker 的工作负载。一个极端糟糕的分区副本分配方案可能会将所有主题的主分区倾斜在少量 Broker 上,从而导致集群内部的其它 Broker 无法承担主分区副本的读写负载,进而影响整个 Broker 集群的性能。此外,考虑到主题分区与 Kafka Broker 的数量都是可以动态扩容的,在扩容后该如何对资源进行重分配才能使得集群保持一定程度上的负载均衡是个非常复杂的问题,Kafka 在这一点上做的不好,也很难做好(Kafka 计算与存储不分离的设计导致分区迁移困难)。我们在下文中对于这类极端复杂的负载均衡问题不做讨论,而主要关注 Kafka 在一般情况下的负载均衡设计。

Broker 层面的负载均衡

Kafka Broker 分配到的主副本分区的数量在很大程度上决定了 Broker 的工作负载,如下图所示是主题 topic1 在由三个 Kafka Broker 组成的集群中的分区副本分配情况,其中 topic1 的配置为三分区三副本。

Kafka分区副本分配示例

一般情况下,Kafka 在初始化分区时会尽量将分区副本均匀分配,但 Broker 总会有宕机失效的时候,当主分区副本宕机后,Kafka 会在 ISR 中重新选举出新的 Broker 作为宕机分区副本新的 Leader。等到宕机的 Broker 重新上线后,其会作为 Follower 分区副本加入集群,此时 Kafka 集群内部的主分区副本分配就变得不均衡了。

为了避免上述情况,Kafka 在集群内部引入了优先副本(Preferred Replica)的概念,优先副本会在 Kafka 默认开启的分区重平衡(Partition Rebalancing)被触发时优先选则对应的 Broker 作为分区的 Leader。Kafka 会尽可能的按照我们预先配置的优先副本方案在 Broker 集群之间分配主分区副本。

实际上,我们在使用 kafka-topics.sh 脚本查看主题分区分配情况时,AR 列表的第一个 Broker 就是分区指定的优先副本。此外,分区副本在选举时并不是一定要选择优先副本对应的 Broker 为主分区副本,当优先副本对应的 Broker 不可用时,Kafka 会推举其他 ISR 作为分区的 Leader。通过指定优先副本并结合默认开启的重平衡机制,Kafka 在 Broker 层面实现了主分区副本的负载均衡。

需要注意的是,分区的重平衡是一个非常重的操作,对于 Kafka 集群的性能和稳定性的影响较大,一般情况下我们可以通过配置 auto.leader.rebalance.enable 选项来关闭 Broker 层面的分区自动重平衡,转而采用人工的方式在合适的时机手动触发,保障 Kafka 集群对外提供服务的可靠性。

生产者层面的负载均衡

正如前文所介绍的,Kafka 的主题消息投递与消费是以分区为基本单位的,生产者在投递消息到 Broker 时首先要确定消息投递到哪个分区中。我们在发送消息时,可以通过 Kafka 提供的 Producer API 指定投递的分区:

// Kafka 集群的地址和端口
String bootstrapServers = "localhost:10086";
// 配置生产者的属性
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 将 key / value 投递到 topic1 的分区 1
ProducerRecord<String, String> record = new ProducerRecord<>("topic1", 1, "key", "value");
producer.send(record);

若未指定分区号,Kafka 则会根据消息的 key 通过分区器(Partitioner)来选择具体的分区,默认分区器的分区策略是对 Key 进行哈希处理,这意味着具有相同 Key 的消息将被分配到同一个分区,从而保证了具有相同 Key 的消息将被按照一定顺序处理。此外,当传入的 Key 为 null 时将会 fallback 到轮询策略。

此外,我们还可以根据具体需要自定义分区器。实现自定义分区器的方式非常简单,只需要创建一个 Partitioner 的实现类:

CustomPartitoner.class
class CustomPartitioner implements Partitioner {

    @Override
    public void configure(Map<String, ?> configs) {
        // 读取配置
    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 自定义分区逻辑
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();

        // 根据消息的 hash 来选择分区
        int partition = Math.abs(key.hashCode()) % numPartitions;

        return partition;
    }

    @Override
    public void close() {
        // 自定义清理操作
    }
}

之后只需要在配置 Producer 时通过 PARTITIONER_CLASS_CONFIG 选择自定义的分区器即可:

// Kafka 集群的地址和端口
String bootstrapServers = "localhost:10086";
// 配置生产者的属性
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>("topic1", 1, "key", "value");
producer.send(record);

消费者层面的负载均衡

Kafka 中主题订阅者的基本单位是消费者组,每个分区只能由消费者组中的一个消费者进行消费,多个消费者组之间对于分区的消费互不影响。Kafka 在消费者层面的负载均衡(再平衡)是以消费者组为单位展开的,消费者组中消费者数量的增减都会使得 Kafka 将分区副本按照一定规则重新分配到消费者组中。这里需要注意的是,不同消费者组之前的分区重分配是互不影响,独立进行的。

Kafka 提供了三个开箱即用的分区分配器:RangeAssignor(默认)、RoundRobinAssignor 以及 StickyAssignor。下面我将逐一介绍这三种分区分配器的负载均衡策略。

RangeAssignor

RangeAssignor 在分配分区时是按照下面的规则进行分配的:对于每个主题,首先将分消费者按照名词的字典序排序,之后将分区总数除以消费者总数获取分区平均值,溢出的余数部分则按顺序平均分配到前几个消费者中。

场景 1:假设有两个主题 t1t2,每个主题都有 4 个分区 p1p2p3p4,消费者组中存在 2 个消费者 c1c2 且都订阅了 t1t2,那么 RangeAssignor 的预期分区分配结果如下:

c1: t1p1, t1p2, t2p1, t2p2
c2: t1p3, t1p4, t2p3, t2p4

场景 2:假设有两个主题 t1t2,每个主题都有 3 个分区 p1p2p3,消费者组中存在 2 个消费者 c1c2 且都订阅了 t1t2,那么 RangeAssignor 的预期分区分配结果如下:

c1: t1p1, t1p2, t2p1, t2p2
c2: t1p3, t2p3

可以看到由于是以主题为单位对分区进行分配,RangeAssignor 在分区较少的情况下并不能保证分区的均匀分配。

RoundRobinAssignor

RoundRobinAssignor 在分配分区时是按照下面的规则进行分配的:将消费者组内所有消费者订阅的所有主题的分区按照字典序排序,之后以轮询的方式分配到每个消费者中,如果某个消费者没有订阅对应的主题,那么则不会将分区分配给它。

可以注意到,RoundRobinAssignor 在一个消费者组内所有消费者都订阅了同样主题的情况下是均匀的,在这种条件下避免了 RangeAssignor 的分配不均的问题,比如对于同样的场景 2,RoundRobinAssignor 的分配结果如下:

c1: t1p1, t1p3, t2p2
c2: t1p2, t2p1, t2p3

但当消费者组内消费者订阅的主题存在差异时,同样会存在分区分配不均匀的情况:

场景 3:假设有两个主题 t1t2,每个主题都有 3 个分区 p1p2p3,消费者组中存在 2 个消费者 c1c2,其中 c1 订阅了 t1t2c2 只订阅了 t2,RoundRobinAssignor 的预期分区分配结果如下:

c1: t1p1, t1p2, t1p3, t2c1
c2: t2c2

StickyAssignor

StickyAssignor 顾名思义,就是要在分区分配时在保证尽可能均匀的前提下与上次分配的情况保持一定程度上的一致,其在分配分区时与 RoundRobinAssignor 基本一致,但是当发生分区重分配时(消费者离开或加入),会尽可能的保留原本消费者的分区分配情况,进而减少分区的移动次数:

场景 4:假设有两个主题 t1t2,每个主题都有 2 个分区 p1p2,消费者组中存在 3 个消费者 c1c2c3,其中 c1c2c3 都订阅了 t1t2,StickyAssignor 的预期分区分配结果如下:

c1: t1p1, t2p2
c2: t1p2
c3: t2p1

此时若 c2 离开了消费者组,那么分区的分配情况会重新调整为:

c1: t1p1, t2p2
c3: t1p2, t2p1

如果采用 RoundRobinAssignor,则分区会重新调整为:

c1:t1p1, t2p1
c2: t1p2, t2p2

可以看到采用 StickyAssignor 只需将 c2 原本负责的 t1p2 移动到 c3 即可,而如果采用 RoundRobinAssignor 则还需要将 c3 原本负责的 t2p1 与 c1 负责的 t1p2 的分配互换,产生了额外的分区移动。

自定义 Assignor

与生产者层面的负载均衡相似,我们同样可以通过自定义分区分配器的方式按需实现自己的分区分配策略。实现自定义分区分配器的方式非常简单,具体流程与自定义 Partitioner 相似,只需要实现 ConsumerPartitionAssignor 接口:

CustomPartitionAssignor.class
public class CustomPartitionAssignor implements ConsumerPartitionAssignor {
    @Override
    public void configure(Map<String, ?> configs) {
        // 读取配置
    }

    @Override
    public String name() {
        // 设置自定义的 Assignor 的名字
        return "CustomPartitionAssignor";
    }

    @Override
    public Map<String, List<TopicPartition>> assign(Cluster metadata, Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) {
        // 按需实现分区分配逻辑,返回的 Map 的 key 为消费者 ID,value 为分配给消费者的分区列表
        return null;
    }

    @Override
    public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
        // 分区分配完成后的回调函数,可以实现打日志一类的操作
    }
}

并将具体实现类注册到客户端配置中即可:

Properties props = new Properties();
// 设置 Kafka 服务器地址和消费者相关属性
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:10086");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 注册自定义的分区分配器
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CustomPartitionAssignor.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.subscribe(Collections.singletonList("topic1"));