怎样给Kafka新增分区

目录
  • 给Kafka新增分区
    • 1、修改 topic 的分区
    • 2、迁移数据
    • 3、迁移
    • 4、验证
  • Kafka分区原理机制
    • 分区结构
    • 分区优点
    • 分区策略
    • 根据分区策略实现消息的顺序消费
    • 默认分区策略源码
  • 总结

给Kafka新增分区

数据量猛增的时候,需要给 kafka 的 topic 新增分区,增大处理的数据量,可以通过以下步骤

1、修改 topic 的分区

kafka-topics --zookeeper hadoop004:2181 --alter --topic flink-test-04 --partitions 3

2、迁移数据

生成迁移计划,手动新建一个 json 文件

{
"topics": [
{"topic": "flink-test-03"}
],
"version": 1
}

生成迁移计划

kafka-reassign-partitions --zookeeper hadoop004:2181 --topics-to-move-json-file topic.json --broker-list “120,121,122” --generate

Current partition replica assignment:

{"version":1,"partitions":[{"topic":"flink-test-02","partition":5,"replicas":[120]},{"topic":"flink-test-02","partition":0,"replicas":[121]},{"topic":"flink-test-02","partition":2,"replicas":[120]},{"topic":"flink-test-02","partition":1,"replicas":[122]},{"topic":"flink-test-02","partition":4,"replicas":[122]},{"topic":"flink-test-02","partition":3,"replicas":[121]}]}

新建一个文件reassignment.json,保存上边这些信息

3、迁移

kafka-reassign-partitions --zookeeper hadoop004:2181 --reassignment-json-file reassignment.json --execute

4、验证

kafka-reassign-partitions --zookeeper hadoop004:2181 --reassignment-json-file reassignment.json --verify

Kafka分区原理机制

分区结构

kafka的消息总共是三层结构

Topic(第一层结构,表示一个主题)-> Partition(分区,每个消息可以有多个分区) -> 消息实例(具体的消息文本等等,一个消息实例只可能在一个分区里面,不会出现在多个分区中)

分区优点

分区其实是一个负载均衡的思想。如此设计能使每一个分区独自处理单独的读写请求,提高吞吐量。

分区策略

  • 轮询策略Round-robin(未指定key新版本默认策略)
  • 随机策略Randomness(老版本默认策略)
  • 消息键排序策略Key-ordering(指定了key,则使用该策略)
  • 根据地理位置进行分区
  • 自定义分区 需要在生产者端实现org.apache.kafka.clients.producer.Partitioner接口,并配置一下实现类的全限定名

根据分区策略实现消息的顺序消费

可以只设置一个分区,这样子消息都是放在一个partition,肯定是先进先出进行消费,然而这种场景无法利用kafka多分区的高吞吐量以及负载均衡的优势。

将需要顺序消费的消息设置key,这个时候根据默认的分区策略,kafka会将所有的相同的key放在一个partition上面,这样既可以使用kafka的partition又可以实现顺序消费。

默认分区策略源码

/**
 * The default partitioning strategy:
 * <ul>
 * <li>If a partition is specified in the record, use it
 * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
 * <li>If no partition or key is present choose a partition in a round-robin fashion
 */
public class DefaultPartitioner implements Partitioner {
    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
    public void configure(Map<String, ?> configs) {}
    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }
    private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
        return counter.getAndIncrement();
    }
    public void close() {}
}

从类注释当中已经很明显的看出来分区逻辑

3. 如果指定了分区,则使用指定分区

4. 如果没有指定分区,但是有key,则使用hash过的key放置消息

5. 如果没有指定分区,也没有key,则使用轮询

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • 使用kafka如何选择分区数及kafka性能测试

    kafka选择分区数及kafka性能测试 1.简言 ​ 如何选择合适的分区,这是我们经常面临的问题,不过针对这个问题,在网上并没有搜到固定的答案.因此,今天在这里主要通过性能测试的工具来告诉如何选择相对应的kafka分区. 2.性能测试工具 ​ kafka本身提供了比较的性能测试工具,我们可以使用它来测试适用于我们机器的kafka分区. ① 生产者性能测试 分别创建三个topic,副本数设置为1. bin/kafka-topics.sh --zookeeper zk --create --rep

  • 解决kafka消息堆积及分区不均匀的问题

    目录 kafka消息堆积及分区不均匀的解决 1.先在kafka消息中创建 2.添加配置文件application.properties 3.创建kafka工厂 4.展示kafka消费者 kafka出现若干分区不消费的现象 定位过程 验证 解决方法 kafka消息堆积及分区不均匀的解决 我在环境中发现代码里面的kafka有所延迟,查看kafka消息发现堆积严重,经过检查发现是kafka消息分区不均匀造成的,消费速度过慢.这里由自己在虚拟机上演示相关问题,给大家提供相应问题的参考思路. 这篇文章有点

  • kafka手动调整分区副本数的操作步骤

    目录 前言 前置准备 操作步骤 增加副本 前言 在生产环境中,akfka集群下的每台服务器的配置和性能可能不一样,但Kafka集群只会根据配置规则创建对应的分区副本,这样一来可能就会导致个别服务器存储压力较大. 在这种情况下,就需要手动调整分区副本的存储.我们不妨看看下面的这张图就明白了 上图的传达的意思是: broker0和broker1所在的服务器配置较高,存储容量较大,但是broker2和broker3所在的机器配置稍差存储容量较小,一开始创建出主题之后,集群只会按照默认的配置规则,将le

  • 详解kafka中的消息分区分配算法

    目录 背景 RangeAssignor 定义 源码分析 场景 RoundRobinAssignor 定义 源码分析 场景 StickyAssignor 定义 场景 背景 kafka有分区机制,一个主题topic在创建的时候,会设置分区.如果只有一个分区,那所有的消费者都订阅的是这一个分区消息:如果有多个分区的话,那消费者之间又是如何分配的呢? 分配算法 RangeAssignor 定义 Kafka默认采⽤RangeAssignor的分配算法. RangeAssignor策略的原理是按照消费者总数

  • 怎样给Kafka新增分区

    目录 给Kafka新增分区 1.修改 topic 的分区 2.迁移数据 3.迁移 4.验证 Kafka分区原理机制 分区结构 分区优点 分区策略 根据分区策略实现消息的顺序消费 默认分区策略源码 总结 给Kafka新增分区 数据量猛增的时候,需要给 kafka 的 topic 新增分区,增大处理的数据量,可以通过以下步骤 1.修改 topic 的分区 kafka-topics --zookeeper hadoop004:2181 --alter --topic flink-test-04 --p

  • Apache Kafka 分区重分配的实现原理解析

    目录 一.前言 二.工具的使用 三.元数据管理及协调器 3.1 ZooKeeper 3.2 Kafka Controller 四.分区重分配流程分析 4.1 kafka-reassign-partitions 客户端 4.2 controller 维护分区的元数据信息 4.3 broker 端数据跨路径迁移 五.总结 本文作者为中国移动云能力中心大数据团队软件开发工程师孙大鹏,本文结合 2.0.0 版本的 Kafka 源码,详细介绍了 Kafka 分区副本重分配的流程和逻辑,供大家参考. 一.前

  • Mysql数据表分区技术PARTITION浅析

    在这一章节里, 我们来了解下 Mysql 中的分区技术 (RANGE, LIST, HASH)   Mysql 的分区技术与水平分表有点类似, 但是它是在逻辑层进行的水平分表, 对于应用而言它还是一张表, 换句话说: 分区不是实际真正的对一张表进行拆分,分区之后表还是一个表,它是把存储文件进行拆分. 在 Mysql 5.1(后) 有了几种分区类型:   RANGE分区: 基于属于一个给定连续区间的列值, 把多行分配给分区 LIST分区: 类似于按 RANGE 分区, 区别在于 LIST 分区是基

  • MySQL中表分区技术详细解析

    MySQL 分区技术(是mysql 5.1以版本后开始用->是甲骨文mysql技术团队维护人员以插件形式插入到mysql里面的技术) 1.概述 数据库单表到达一定量后,性能会有衰减,像mysql\sql server等犹为明显,所以需要把这些数据进行分区处理.同时有时候可能出现数据剥离什么的,分区表就更有用处了! MySQL 5.1 中新增的分区(Partition)功能就开始增加,优势也越来越明显了: 与单个磁盘或文件系统分区相比,可以存储更多的数据 很容易就能删除不用或者过时的数据 一些查询

  • mysql的分区技术详细介绍

    一.概述 当 MySQL的总记录数超过了100万后,会出现性能的大幅度下降吗?答案是肯定的,但是,性能下降>的比率不一而同,要看系统的架构.应用程序.还有>包括索引.服务器硬件等多种因素而定.当有网友问我这个问题的时候,我最常见的回答>就是:分表,可以根据id区间或者时间先后顺序等多种规则来分表.分表很容易,然而由此所带来的应用程序甚至是架构方面的改动工作却不>容小觑,还包括将来的扩展性等. 在以前,一种解决方案就是使用 MERGE 类型,这是一个非常方便的做饭.架构和程序基本上不

  • mysql分区功能详解,以及实例分析

    一,什么是数据库分区 前段时间写过一篇关于mysql分表的 的文章,下面来说一下什么是数据库分区,以mysql为例.mysql数据库中的数据是以文件的形势存在磁盘上的,默认放在/mysql/data下面 (可以通过my.cnf中的datadir来查看),一张表主要对应着三个文件,一个是frm存放表结构的,一个是myd存放表数据的,一个是myi存表 索引的.如果一张表的数据量太大的话,那么myd,myi就会变的很大,查找数据就会变的很慢,这个时候我们可以利用mysql的分区功能,在物理上将这 一张

  • MySQL数据库分区功能的使用教程

    零,什么是数据库分区 来说一下什么是数据库分区,以mysql为例.mysql数据库中的数据是以文件的形势存在磁盘上的,默认放在/mysql/data下面(可以通过my.cnf中的datadir来查看),一张表主要对应着三个文件,一个是frm存放表结构的,一个是myd存放表数据的,一个是myi存表索引的.如果一张表的数据量太大的话,那么myd,myi就会变的很大,查找数据就会变的很慢,这个时候我们可以利用mysql的分区功能,在物理上将这一张表对应的三个文件,分割成许多个小块,这样呢,我们查找一条

随机推荐