kafka手动调整分区副本数的操作步骤

目录
  • 前言
  • 前置准备
  • 操作步骤
  • 增加副本

前言

在生产环境中,akfka集群下的每台服务器的配置和性能可能不一样,但Kafka集群只会根据配置规则创建对应的分区副本,这样一来可能就会导致个别服务器存储压力较大。

在这种情况下,就需要手动调整分区副本的存储。我们不妨看看下面的这张图就明白了

上图的传达的意思是:

broker0和broker1所在的服务器配置较高,存储容量较大,但是broker2和broker3所在的机器配置稍差存储容量较小,一开始创建出主题之后,集群只会按照默认的配置规则,将leader和follower均匀的分配到不同的节点上去,并不知道具体哪一台机器的配置如何;

但实际上,我们希望某台服务器承担更多的读写压力,这样的话,就需要事后人为进行手动的对分区副本的调整

来看看我们的需求

创建一个新的topic,4个分区,两个副本,名为three,将 该topic的所有副本都存储到broker1和broker2两台服务器上

前置准备

  • centos7服务器,虚拟机或者云服务器
  • 提前安装好kafka集群,并启动集群,关于集群搭建,可以参考 kafka集群服役新节点

操作步骤

1、创建一个主题,4个分区,2个副本

./kafka-topics.sh --zookeeper IP:2181 --create --topic three --partitions 4 --replication-factor 2

2、检查下当前创建的topic情况

./kafka-topics.sh --zookeeper IP:2181 --describe --topic three

从副本的分布情况来看,副本目前尽可能的分布在不同的机器上,而我们的需求是期望所有的数据尽可能存储在broker1和broker2上面

3、创建副本存储计划

这个有点类似于前面的章节中我们讲到的服役新节点的操作,在bin目录下创建一个json文件,手动指定分区副本的存储位置

创建一个名字叫做 increase-replication-factor.json 的文件,将如下内容拷贝进去,关于下面这段内容,相信大家都能看得懂,实际操作过程中,需要结合自己kafka集群下的机器配置情况合理调整,我这里按照原始需求,假定broker3和broker4的配置稍差,于是将副本尽可能的配置到broker1和broker2上

{
  "version":1,
  "partitions":[{"topic":"three","partition":0,"replicas":[1,2]},
  {"topic":"three","partition":1,"replicas":[1,2]},
  {"topic":"three","partition":2,"replicas":[2,1]},
  {"topic":"three","partition":3,"replicas":[2,1]}]
}

4、执行存储计划

./kafka-reassign-partitions.sh --zookeeper IP:2181 --reassignment-json-file increase-replication-factor.json --execute

看到下面的内容,说明计划执行完毕,

5、验证第四步中的执行计划

./kafka-reassign-partitions.sh --zookeeper IP:2181 --reassignment-json-file increase-replication-factor.json --verify

看到下面的内容,说明副本执行计划完成执行

6、再次查看副本存储情况

./kafka-topics.sh --zookeeper IP:2181 --describe --topic three

与第一次的对比,重新分配之后,副本都转移到了broker1和broker2上面了

通过上面的操作步骤,我们就完成了手动调整分区副本的需求

增加副本

在生产环境中,假如某个主题重要等级需提升,则可以考虑在现有的基础上增加副本。副本数的增加需要先制定计划,然后根据计划执行,这个和之前的kafka集群服役新节点以及上文调整分区副本类似

下面来看看具体的操作步骤

1、创建一个新的主题,3个分区,1个副本

./kafka-topics.sh --zookeeper IP:2181 --create --topic zcy333 --partitions 3 --replication-factor 1

2、查看当前主题的分区副本存储情况

./kafka-topics.sh --zookeeper IP:2181 --describe --topic zcy333

从展示情况来看,目前副本只有一个,因此我们的需求是增加副本数量

3、创建副本存储计划

所有副本都指定存储在 broker1、broker2、broker3 中

增加一个increase-replication-factor.json的文件,将下面的内容拷贝进去

{
"version":1,
"partitions":[
  {"topic":"zcy333","partition":0,"replicas":[1,2,3]},
  {"topic":"zcy333","partition":1,"replicas":[1,2,3]},
  {"topic":"zcy333","partition":2,"replicas":[1,2,3]}
  ]
}

4、执行副本存储计划

./kafka-reassign-partitions.sh --zookeeper IP:2181 --reassignment-json-file increase-replication-factor.json --execute

看到下面的内容,说明执行成功

可以使用下面的命令,验证执行计划

./kafka-reassign-partitions.sh --zookeeper IP:2181 --reassignment-json-file increase-replication-factor.json --verify

5、再次查看执行后的主题分区副本情况

./kafka-topics.sh --zookeeper IP:2181 --describe --topic zcy333

可以对比执行后的情况与执行之前的情况,明显发现通过上面的操作,针对zcy333这个topic,副本增加了

通过上面的操作,我们成功对现有的topic的副本数进行了增加

到此这篇关于kafka手动调整分区副本数的文章就介绍到这了,更多相关kafka调整分区副本数内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 原因分析IDEA导入Spring-kafka项目Gradle编译失败

    目录 前言 异常信息 具体原因 解决问题 前言 最近在弄kafka相关的东东,因为是spring boot工程,所以用到了Spring-kafka,一个包含了kafka-producer和kafka-consumer自动装配的依赖.为了进一步研究spring是如何封装的kafka官方客户端的细节,所以从github上拉到了源码准备研究下,在导入到IDEA中时,因为Spring-kafka工程使用的是Gradle,导入时就编译失败了,导入工程失败. Spring-kafka地址:https://g

  • kafka并发写大消息异常TimeoutException排查记录

    目录 前言 定位异常点 分析抛异常的逻辑 真实原因-解决方案 结语 前言 先简单介绍下我们的使用场景,线上5台Broker节点的kafka承接了所有binlog订阅的数据,用于Flink组件接收数据做数据中台的原始数据.昨儿开发反馈,线上的binlog大量报错,都是kafka的异常,而且都是同一条topic抛的错,特征也很明显,发送的消息体非常大,主观判断肯定是写入大消息导致的超时了,异常详情如下: thread: kafka-producer-network-thread | producer

  • Kafka的监听地址配置实例详解

    有时我们会碰到网络是通畅的,但却连不上Kafka,特别是在多网卡环境或者云环境上很容易出现,这个其实和Kafka的监听配置有关系.本文介绍监听相关的配置,目前监听相关的参数主要有下面几个: listeners advertised.listeners listener.security.protocol.map inter.broker.listener.name security.inter.broker.protocol advertised.host.name(历史遗留,已废弃,勿使用)

  • 深入研究spring boot集成kafka之spring-kafka底层原理

    目录 前言 简单集成 引入依赖 添加配置 测试发送和接收 Spring-kafka-test嵌入式KafkaServer 引入依赖 启动服务 创建新的Topic 程序启动时创建TOPIC 代码逻辑中创建 PS:其他的方式创建TOPIC 引入依赖 api方式创建 命令方式创建 消息发送之KafkaTemplate探秘 获取发送结果 异步获取 同步获取 KAFKA事务消息 REPLYINGKAFKATEMPLATE获得消息回复 Spring-kafka消息消费用法探秘 @KAFKALISTENER的

  • Springboot整合kafka的示例代码

    目录 1. 整合kafka 2. 消息发送 2.1 发送类型 2.2 序列化 2.3 分区策略 3. 消息消费 3.1 消息组别 3.2 位移提交 1. 整合kafka 1.引入依赖 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> 2.设置yml文件 spring:

  • kafka手动调整分区副本数的操作步骤

    目录 前言 前置准备 操作步骤 增加副本 前言 在生产环境中,akfka集群下的每台服务器的配置和性能可能不一样,但Kafka集群只会根据配置规则创建对应的分区副本,这样一来可能就会导致个别服务器存储压力较大. 在这种情况下,就需要手动调整分区副本的存储.我们不妨看看下面的这张图就明白了 上图的传达的意思是: broker0和broker1所在的服务器配置较高,存储容量较大,但是broker2和broker3所在的机器配置稍差存储容量较小,一开始创建出主题之后,集群只会按照默认的配置规则,将le

  • 基于 Dubbo Admin 动态调整服务超时时间的操作步骤

    目录 开始之前 背景信息 操作步骤 规则详解 结果验证 Dubbo提供动态调整超时时间的服务治理能力,可以在无需重启应用的情况下,动态调整服务超时时间. Dubbo可以通过XML配置,注解配置,动态配置实现动态调整超时时间,这里主要介绍动态配置的方式,其他配置方式请参考旧文档配置 开始之前 请确保成功运行Dubbo-Admin 背景信息 在日常工作中会遇到各类超时配置,业务逻辑变更后,已有调用关系随着业务发展可能需要不断调整,相应服务接口响应时间的变化可能需要上线后才能确定.Dubbo-Admi

  • 手动释放Linux服务器内存(具体操作步骤)

    在服务器运行过程中,使用free -m查看服务器内存时,经常会发现free值很小,有些同学就会很紧张,总想采取一些措施,使得free值看起来高一点,心里就比较爽一点.其实,个人觉得这完全是图一时之快,没什么实质性用途. 一.大众释放内存方法1. 首先使用free -m查看剩余内存 复制代码 代码如下: linux-8v2i:~ # free -m             total       used       free     shared    buffers     cachedMem

  • linux文件系统调整大小的方法(linux调整分区大小)

    在使用linux的过程中, 有时会出现因为安装系统时分区不当导致有的分区空间不足,而有的分区空间过剩的情况.比如: 我在安装系统时给/usr/local分配器了5G的空间,但使用一段过程后发现, /usr/local最多只用到了1G.这样可以将/usr/local大小调整为1G. 空出4G留作他用.本文归纳了在不破快文件系统数据的前提下对文件系统大小进行调整的方法.这里采用的是"拆东墙, 补西墙"的方法. 当然, 如果你的磁盘中有未分区的空闲空间, 你就不用减小某个分区的空间了. 准备

  • react ant Design手动设置表单的值操作

    1.设置表单的值 this.props.form.setFieldsValue({ name:"张三", }); 2.清空表单的值 this.props.form.resetFields(); 3.获取某一输入框的值 this.props.form.getFieldValue('newPassword'); 4.获取整个表单的值 this.props.form.getFieldsValue(); 多看官方文档就知道这些东西了 补充知识:react使用antd表单赋值,用于修改弹框 1.

  • kafka与storm集群环境的安装步骤详解

    前言 在开始之前,需要说明下,storm和kafka集群安装是没有必然联系的,我将这两个写在一起,是因为他们都是由zookeeper进行管理的,也都依赖于JDK的环境,为了不重复再写一遍配置,所以我将这两个写在一起.若只需一个,只需挑选自己选择的阅读即可.下面话不多说了,来一起看看详细的介绍吧. 这两者的依赖如下: Storm集群:JDK1.8 , Zookeeper3.4,Storm1.1.1: Kafa集群 : JDK1.8 ,Zookeeper3.4 ,Kafka2.12: 说明: Sto

  • MySQL5.7的安装与配置详细操作步骤

    一.MySQL的下载  1.登陆MySQL的官网下载适用于64位系统的ZIP压缩包(https://dev.mysql.com/downloads/mysql/) 二.解压安装包 将下载的ZIP压缩包解压到任意文件夹.(此处为: C:\mysql5.7) 三.修改配置文件 将解压文件夹目录下的my-default.ini 文件重命名为 my.ini  . 用文本编辑器打开并清空其中内容. 添加内容(参考互联网): [mysql] # 设置mysql客户端默认字符集 default-charact

  • 使用anaconda的pip安装第三方python包的操作步骤

    相比于原生的python开发核心包,Anaconda已经集成了许多的第三方库,但是这在实际应用中是远远不够的,因此我们需要手动安装第三方库 使用pip可以快速的安装这些库 启动anaconda命令窗口: 开始> 所有程序> Anaconda Command Prompt 输入pip,可以查看pip指令的用法和相关的提示信息 pip install buitwith,可以开始安装buitwith库 以上这篇使用anaconda的pip安装第三方python包的操作步骤就是小编分享给大家的全部内容

  • MySQL绿色解压缩版安装与配置操作步骤

    操作步骤: 一.安装MySQL数据库 1.下载MySQL-5.6.17-winx64.zip文件. 2.解压到指定目录,本例为D:\mysql-5.6.17-winx64. 3.修改配置文件,my-default.ini名称改为:my.ini,文件里面的参数配置: [mysqld] # 设置mysql的安装目录 basedir=D:/ mysql-5.6.17-winx64 # 设置mysql数据库的数据的存放目录,必须是data datadir=D:/ mysql-5.6.17-winx64/

  • 动态设置django的model field的默认值操作步骤

    问题背景 django的model field需要动态设置默认值,具体案例如下: 原始代码如下,model是Application,其中字段ignore_fort的默认值设置为False class Application(TimestampedModel): name = models.CharField(max_length=255, null=True) ignore_fort = models.BooleanField(default=False) 然而现在有这样一个需求:default

随机推荐