详解kafka中的消息分区分配算法

目录
  • 背景
  • RangeAssignor
    • 定义
    • 源码分析
    • 场景
  • RoundRobinAssignor
    • 定义
    • 源码分析
    • 场景
  • StickyAssignor
    • 定义
    • 场景

背景

kafka有分区机制,一个主题topic在创建的时候,会设置分区。如果只有一个分区,那所有的消费者都订阅的是这一个分区消息;如果有多个分区的话,那消费者之间又是如何分配的呢?

分配算法

RangeAssignor

定义

Kafka默认采⽤RangeAssignor的分配算法。

RangeAssignor策略的原理是按照消费者总数和分区总数进⾏整除运算来获得⼀个跨度,然 后将分区按照跨度进⾏平均分配,以保证分区尽可能均匀地分配给所有的消费者。对于每⼀个 Topic,RangeAssignor策略会将消费组内所有订阅这个Topic的消费者按照名称的字典序排序,然 后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配 ⼀个分区。

这种分配⽅式明显的⼀个问题是随着消费者订阅的Topic的数量的增加,不均衡的问题会越来 越严重,⽐如上图中4个分区3个消费者的场景,C0会多分配⼀个分区。如果此时再订阅⼀个分区 数为4的Topic,那么C0⼜会⽐C1、C2多分配⼀个分区,这样C0总共就⽐C1、C2多分配两个分区 了,⽽且随着Topic的增加,这个情况会越来越严重。

源码分析

public class RangeAssignor extends AbstractPartitionAssignor {
    ....
    @Override
    public Map> assign(Map partitionsPerTopic, Map subscriptions) {
        // 1. 获取每个topic被多少个consumer订阅了
        Map<String,List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
        // 2. 存储最终的分配⽅案
        Map<String,List<String>> assignment = new HashMap<>();
        for (String memberId : subscriptions.keySet())
            assignment.put(memberId, new ArrayList());
        for (Map.Entry> topicEntry : consumersPerTopic.entrySet()) {
            String topic = topicEntry.getKey();
            List consumersForTopic = topicEntry.getValue();
            // 3. 每个topic的partition数量
            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
            if (numPartitionsForTopic == null)
            continue;
            Collections.sort(consumersForTopic);
            // 4. 表示平均每个consumer会分配到多少个partition
            int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
            // 5. 平均分配后还剩下多少个partition未被分配
            int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
            List partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
            // 6. 这⾥是关键点,分配原则是将未能被平均分配的partition分配到前 consumersWithExtraPartition个consumer
            for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
                int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
                int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1); assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
                }
            }
            return assignment;
    }
}

场景

可以完全平均分配

无法完全平均分配,排序靠前分的更多

消费者数量大于分区数量,排名靠前先分得,排名靠后未分得分区

RoundRobinAssignor

定义

RoundRobinAssignor的分配策略是将消费组内订阅的所有Topic的分区及所有消费者进⾏排序后尽 量均衡的分配(RangeAssignor是针对单个Topic的分区进⾏排序分配的)。如果消费组内,消费者订阅 的Topic列表是相同的(每个消费者都订阅了相同的Topic),那么分配结果是尽量均衡的(消费者之间 分配到的分区数的差值不会超过1)。

源码分析

package org.apache.kafka.clients.consumer;
public class RoundRobinAssignor extends AbstractPartitionAssignor {
@Override
public Map> assign(Map partitionsPerTopic, Map subscriptions) {
        <Map> assignment = new HashMap<>();
        for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList()); // 1. 环状链表,存储所有的consumer,⼀次迭代完之后⼜会回到原点
        CircularIterator assigner = new CircularIterator<> (Utils.sorted(subscriptions.keySet())); // 2. 获取所有订阅的topic的partition总数 for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {
        final String topic = partition.topic();
        while (!subscriptions.get(assigner.peek()).topics().contains(topic))
            assigner.next();
            assignment.get(assigner.next()).add(partition);
        }
        return assignment;
    }
.... }

场景

无法完全平均分配,排序靠前分的更多

StickyAssignor

定义

尽管RoundRobinAssignor已经在RangeAssignor上做了⼀些优化来更均衡的分配分区,但是在⼀些情况下依旧会产⽣严重的分配偏差,从字⾯意义上看,Sticky是“粘性的”,可以理解为分配结果是带“粘性的”——每⼀次分配变更相对 上⼀次分配做最少的变动(上⼀次的结果是有粘性的) 其⽬标有两点:

  • 分区的分配尽量的均衡
  • 每⼀次重分配的结果尽量与上⼀次分配结果保持⼀致

场景

到此这篇关于详解kafka中的消息分区分配算法的文章就介绍到这了,更多相关kafka消息分区分配算法内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • kafka手动调整分区副本数的操作步骤

    目录 前言 前置准备 操作步骤 增加副本 前言 在生产环境中,akfka集群下的每台服务器的配置和性能可能不一样,但Kafka集群只会根据配置规则创建对应的分区副本,这样一来可能就会导致个别服务器存储压力较大. 在这种情况下,就需要手动调整分区副本的存储.我们不妨看看下面的这张图就明白了 上图的传达的意思是: broker0和broker1所在的服务器配置较高,存储容量较大,但是broker2和broker3所在的机器配置稍差存储容量较小,一开始创建出主题之后,集群只会按照默认的配置规则,将le

  • Java kafka如何实现自定义分区类和拦截器

    生产者发送到对应的分区有以下几种方式: (1)指定了patition,则直接使用:(可以查阅对应的java api, 有多种参数) (2)未指定patition但指定key,通过对key的value进行hash出一个patition: (3)patition和key都未指定,使用轮询选出一个patition. 但是kafka提供了,自定义分区算法的功能,由业务手动实现分布: 1.实现一个自定义分区类,CustomPartitioner实现Partitioner import org.apache

  • 解决kafka消息堆积及分区不均匀的问题

    目录 kafka消息堆积及分区不均匀的解决 1.先在kafka消息中创建 2.添加配置文件application.properties 3.创建kafka工厂 4.展示kafka消费者 kafka出现若干分区不消费的现象 定位过程 验证 解决方法 kafka消息堆积及分区不均匀的解决 我在环境中发现代码里面的kafka有所延迟,查看kafka消息发现堆积严重,经过检查发现是kafka消息分区不均匀造成的,消费速度过慢.这里由自己在虚拟机上演示相关问题,给大家提供相应问题的参考思路. 这篇文章有点

  • 使用kafka如何选择分区数及kafka性能测试

    kafka选择分区数及kafka性能测试 1.简言 ​ 如何选择合适的分区,这是我们经常面临的问题,不过针对这个问题,在网上并没有搜到固定的答案.因此,今天在这里主要通过性能测试的工具来告诉如何选择相对应的kafka分区. 2.性能测试工具 ​ kafka本身提供了比较的性能测试工具,我们可以使用它来测试适用于我们机器的kafka分区. ① 生产者性能测试 分别创建三个topic,副本数设置为1. bin/kafka-topics.sh --zookeeper zk --create --rep

  • 详解kafka中的消息分区分配算法

    目录 背景 RangeAssignor 定义 源码分析 场景 RoundRobinAssignor 定义 源码分析 场景 StickyAssignor 定义 场景 背景 kafka有分区机制,一个主题topic在创建的时候,会设置分区.如果只有一个分区,那所有的消费者都订阅的是这一个分区消息:如果有多个分区的话,那消费者之间又是如何分配的呢? 分配算法 RangeAssignor 定义 Kafka默认采⽤RangeAssignor的分配算法. RangeAssignor策略的原理是按照消费者总数

  • 详解Java中Dijkstra(迪杰斯特拉)算法的图解与实现

    目录 简介 工作过程 总体思路 实现 小根堆 Dijsktra 测试 简介 Dijkstra(迪杰斯特拉)算法是典型的单源最短路径算法,用于计算一个节点到其他所有节点的最短路径.主要特点是以起始点为中心向外层层扩展,直到扩展到终点为止.Dijkstra算法是很有代表性的最短路径算法,在很多专业课程中都作为基本内容有详细的介绍,如数据结构,图论,运筹学等等.注意该算法要求图中不存在负权边.对应问题:在无向图G=(V,E)中,假设每条边E(i)的长度W(i),求由顶点V0到各节点的最短路径. 工作过

  • 详解Linux 中获取硬盘分区或文件系统的 UUID 的七种方法

    作为一个 Linux 系统管理员,你应该知道如何去查看分区的 UUID 或文件系统的 UUID.因为现在大多数的 Linux 系统都使用 UUID 挂载分区.你可以在 /etc/fstab 文件中可以验证. 有许多可用的实用程序可以查看 UUID.本文我们将会向你展示多种查看 UUID 的方法,并且你可以选择一种适合于你的方法. 何为 UUID? UUID 意即 通用唯一识别码 Universally Unique Identifier ,它可以帮助 Linux 系统识别一个磁盘分区而不是块设备

  • 详解Java中使用泛型实现快速排序算法的方法

    快速排序算法概念 快速排序一般基于递归实现.其思路是这样的: 1.选定一个合适的值(理想情况中值最好,但实现中一般使用数组第一个值),称为"枢轴"(pivot). 2.基于这个值,将数组分为两部分,较小的分在左边,较大的分在右边. 3.可以肯定,如此一轮下来,这个枢轴的位置一定在最终位置上. 4.对两个子数组分别重复上述过程,直到每个数组只有一个元素. 5.排序完成. 基本实现方式: public static void quickSort(int[] arr){ qsort(arr,

  • 详解Django中异步任务之django-celery

    Celery文档参考:http://docs.jinkan.org/docs/celery/ 参考文章:https://www.jb51.net/article/158046.htm Django中异步任务---django-celery Celery简单介绍: celery使用场景: 耗时任务定时任务 请求结果不怎么重要的 耗时任务比如:发送短信验证码我们可以先发送给客户任务状态(请求成功或失败) 请求结果重要的建议使用django实现 比如:支付 首先简单介绍一下,Celery 是一个强大的

  • 一文详解Redis中的持久化

    目录 1. 前言 2. RDB 2.1 手动触发 2.2 自动触发 3. bgsave大致流程 4. RDB持久化方式的优缺点 5. AOF 6. AOF的使用方式 7. AOF流程剖析 7.1 命令写入 7.2 文件同步 7.3 重写机制 7.4 重启加载 8. 问题定位与优化 8.1 关于fork操作 8.2 关于子进程开销 8.3 关于AOF追加阻塞 1. 前言 为什么要进行持久化?:持久化功能有效地避免因进程退出造成的数据丢失问题,当下次重启时利用之前持久化的文件即可实现数据恢复. 持久

  • 详解Angular中$cacheFactory缓存的使用

    最近在学习使用angular,慢慢从jquery ui转型到用ng开发,发现了很多不同点,继续学习吧: 首先创建一个服务,以便在项目中的controller中引用,服务有几种存在形式,factory();service();constant();value();provider();其中provider是最基础的,其他服务都是基于这个写的,具体区别这里就不展开了,大家可以看看源码:服务是各个controller之间通话的重要形式,在实际项目中会用的很多,下面是代码: angular.module

  • 详解JSP 中Spring工作原理及其作用

    详解JSP 中Spring工作原理及其作用 1.springmvc请所有的请求都提交给DispatcherServlet,它会委托应用系统的其他模块负责负责对请求进行真正的处理工作. 2.DispatcherServlet查询一个或多个HandlerMapping,找到处理请求的Controller. 3.DispatcherServlet请请求提交到目标Controller 4.Controller进行业务逻辑处理后,会返回一个ModelAndView 5.Dispathcher查询一个或多个

  • 详解Swift中对C语言接口缓存的使用以及数组与字符串转为指针类型的方法

    详解Swift中对C语言接口缓存的使用以及数组与字符串转为指针类型的方法 由于Swift编程语言属于上层编程语言,而Swift中由于为了低层的高性能计算接口,所以往往需要C语言中的指针类型,由此,在Swift编程语言刚诞生的时候就有了UnsafePointer与UnsafeMutablePointer类型,分别对应为const Type*类型与Type *类型. 而在Swift编程语言中,由于一般数组(Array)对象都无法直接用于C语言中含有指针类型的函数参数(比如:void*),所以往往需要

  • 详解jQuery中ajax.load()方法

    jQuery load() 方法 jQuery load() 方法是简单但强大的 AJAX 方法. load() 方法从服务器加载数据,并把返回的数据放入被选元素中. 语法: $(selector).load(URL,data,callback); load()函数用于从服务器加载数据,并使用返回的html内容替换当前匹配元素的内容. load()函数默认使用GET方式,如果提供了对象形式的数据,则自动转为POST方式. 因为默认使用的是Get请求方式,所以我们也可以在url加数据进行提交. 例

随机推荐