Kafka Producer中的消息缓存模型图解详解

目录
  • 前言
  • 什么是消息累加器RecordAccumulator
  • 消息缓存模型
    • ProducerBatch的内存大小
  • 内存分配
    • Batch的创建和释放
  • 问题和答案
  • 总结

前言

在阅读本文之前, 希望你可以思考一下下面几个问题, 带着问题去阅读文章会获得更好的效果。

  1. 发送消息的时候, 当Broker挂掉了,消息体还能写入到消息缓存中吗?
  2. 当消息还存储在缓存中的时候, 假如Producer客户端挂掉了,消息是不是就丢失了?
  3. 当最新的ProducerBatch还有空余的内存,但是接下来的一条消息很大,不足以加上上一个Batch中,会怎么办呢?
  4. 那么创建ProducerBatch的时候,应该分配多少的内存呢?

什么是消息累加器RecordAccumulator

kafka为了提高Producer客户端的发送吞吐量和提高性能,选择了将消息暂时缓存起来,等到满足一定的条件, 再进行批量发送, 这样可以减少网络请求,提高吞吐量。

而缓存这个消息的就是RecordAccumulator类.

上图就是整个消息存放的缓存模型,我们接下来一个个来讲解。

消息缓存模型

上图表示的就是 消息缓存的模型, 生产的消息就是暂时存放在这个里面。

  1. 每条消息,我们按照TopicPartition维度,把他们放在不同的Deque<ProducerBatch> 队列里面。
    TopicPartition相同,会在相同Deque<ProducerBatch> 的里面。
  2. ProducerBatch : 表示同一个批次的消息, 消息真正发送到Broker端的时候都是按照批次来发送的,
    这个批次可能包含一条或者多条消息。
  3. 如果没有找到消息对应的ProducerBatch队列, 则创建一个队列。
  4. 找到ProducerBatch队列队尾的Batch,发现Batch还可以塞下这条消息,则将消息直接塞到这个Batch中
  5. 找到ProducerBatch队列队尾的Batch,发现Batch中剩余内存,不够塞下这条消息,则会创建新的Batch
  6. 当消息发送成功之后, Batch会被释放掉。

ProducerBatch的内存大小

那么创建ProducerBatch的时候,应该分配多少的内存呢?

先说结论: 当消息预估内存大于batch.size的时候,则按照消息预估内存创建, 否则按照batch.size的大小创建(默认16k).

我们来看一段代码,这段代码就是在创建ProducerBatch的时候预估内存的大小

RecordAccumulator#append

       // 找到 batch.size 和 这条消息在batch中的总内存大小的 最大值
       int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
       // 申请内存
       buffer = free.allocate(size, maxTimeToBlock);

假设当前生产了一条消息为M, 刚好消息M找不到可以存放消息的ProducerBatch(不存在或者满了),那么这个时候就需要创建一个新的ProducerBatch了

预估消息的大小 跟batch.size 默认大小16384(16kb). 对比,取最大值用于申请的内存大小的值。

那么, 这个消息的预估是如何预估的?纯粹的是消息体的大小吗?

DefaultRecordBatch#estimateBatchSizeUpperBound

预估需要的Batch大小,是一个预估值,因为没有考虑压缩算法从额外开销

    /**
    * 使用给定的键和值获取只有一条记录的批次大小的上限。
    * 这只是一个估计,因为它没有考虑使用的压缩算法的额外开销。
    **/
    static int estimateBatchSizeUpperBound(ByteBuffer key, ByteBuffer value, Header[] headers) {
        return RECORD_BATCH_OVERHEAD + DefaultRecord.recordSizeUpperBound(key, value, headers);
    }
  1. 预估这个消息M的大小 + 一个RECORD_BATCH_OVERHEAD的大小
  2. RECORD_BATCH_OVERHEAD是一个Batch里面的一些基本元信息,总共占用了 61B
  3. 消息M的大小也并不是单单的只有消息体的大小,总大小=(key,value,headers)的大小+MAX_RECORD_OVERHEAD
  4. MAX_RECORD_OVERHEAD :一条消息头最大占用空间, 最大值为21B

也就是说创建一个ProducerBatch,最少就要83B .

比如我发送一条消息 " 1 " , 预估得到的大小是 86B, 跟batch.size(默认16384) 相比取最大值。 那么申请内存的时候取最大值 16384 。

关于Batch的结构和消息的结构,我们回头单独用一篇文章来讲解

内存分配

我们都知道RecordAccumulator里面的缓存大小是一开始定义好的, 由buffer.memory控制, 默认33554432 (32M)

当生产的速度大于发送速度的时候,就可能出现Producer写入阻塞。

而且频繁的创建和释放ProducerBatch,会导致频繁GC, 所有kafka中有个缓存池的概念,这个缓存池会被重复使用,但是只有固定( batch.size)的大小才能够使用缓存池。

PS:以下16k指得是 batch.size的默认值.

Batch的创建和释放

1. 内存16K 缓存池中有可用内存

①. 创建Batch的时候, 会去缓存池中,获取队首的一块内存ByteBuffer 使用。

②. 消息发送完成,释放Batch, 则会把这个ByteBuffer,放到缓存池的队尾中,并且调用ByteBuffer.clear 清空数据。以便下次重复使用

2. 内存16K 缓存池中无可用内存

①. 创建Batch的时候, 去非缓存池中的内存获取一部分内存用于创建Batch. 注意:这里说的获取内存给Batch, 其实就是让 非缓存池nonPooledAvailableMemory 减少 16K 的内存, 然后Batch正常创建就行了, 不要误以为好像真的发生了内存的转移。

②. 消息发送完成,释放Batch, 则会把这个ByteBuffer,放到缓存池的队尾中,并且调用ByteBuffer.clear 清空数据, 以便下次重复使用

3. 内存非16K 非缓存池中内存够用

①. 创建Batch的时候, 去非缓存池(nonPooledAvailableMemory)内存获取一部分内存用于创建Batch. 注意:这里说的获取内存给Batch, 其实就是让 非缓存池(nonPooledAvailableMemory) 减少对应的内存, 然后Batch正常创建就行了, 不要误以为好像真的发生了内存的转移。

②. 消息发送完成,释放Batch, 纯粹的是在非缓存池(nonPooledAvailableMemory)中加上刚刚释放的Batch内存大小。 当然这个Batch会被GC掉

4. 内存非16K 非缓存池内存不够用

①. 先尝试将 缓存池中的内存一个一个释放到 非缓存池中, 直到非缓存池中的内存够用与创建Batch了

②. 创建Batch的时候, 去非缓存池(nonPooledAvailableMemory)内存获取一部分内存用于创建Batch. 注意:这里说的获取内存给Batch, 其实就是让 非缓存池(nonPooledAvailableMemory) 减少对应的内存, 然后Batch正常创建就行了, 不要误以为好像真的发生了内存的转移。

③. 消息发送完成,释放Batch, 纯粹的是在非缓存池(nonPooledAvailableMemory)中加上刚刚释放的Batch内存大小。 当然这个Batch会被GC掉

例如: 下面我们需要创建 48k的batch, 因为超过了16k,所以需要在非缓存池中分配内存, 但是非缓存池中当前可用内存为0 , 分配不了, 这个时候就会尝试去 缓存池里面释放一部分内存到 非缓存池。

释放第一个ByteBuffer(16k) 不够,则继续释放第二个,直到释放了3个之后总共48k,发现内存这时候够了, 再去创建Batch。

注意:这里我们涉及到的 非缓存池中的内存分配, 仅仅指的的内存数字的增加和减少。

问题和答案

发送消息的时候, 当Broker挂掉了,消息体还能写入到消息缓存中吗?

当Broker挂掉了,Producer会提示下面的警告️, 但是发送消息过程中

这个消息体还是可以写入到 消息缓存中的,也仅仅是写到到缓存中而已。

WARN [Producer clientId=console-producer] Connection to node 0 (/172.23.164.192:9090) could not be established. Broker may not be available

当最新的ProducerBatch还有空余的内存,但是接下来的一条消息很大,不足以加上上一个Batch中,会怎么办呢?

那么会创建新的ProducerBatch。

那么创建ProducerBatch的时候,应该分配多少的内存呢?

触发创建ProducerBatch的那条消息预估大小大于batch.size ,则以预估内存创建。
否则,以batch.size创建。

还有一个问题供大家思考:

当消息还存储在缓存中的时候, 假如Producer客户端挂掉了,消息是不是就丢失了?

总结

到此这篇关于Kafka Producer中消息缓存模型的文章就介绍到这了,更多相关Kafka Producer消息缓存模型内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Kafka producer端开发代码实例

    一.producer工作流程 producer使用用户启动producer的线程,将待发送的消息封装到一个ProducerRecord类实例,然后将其序列化之后发送给partitioner,再由后者确定目标分区后一同发送到位于producer程序中的一块内存缓冲区中.而producer的另外一个线程(Sender线程)则负责实时从该缓冲区中提取出准备就绪的消息封装进一个批次(batch),统一发送给对应的broker,具体流程如下图: 二.producer示例程序开发 首先引入kafka相关依赖

  • Kafka Java Producer代码实例详解

    根据业务需要可以使用Kafka提供的Java Producer API进行产生数据,并将产生的数据发送到Kafka对应Topic的对应分区中,入口类为:Producer Kafka的Producer API主要提供下列三个方法: public void send(KeyedMessage<K,V> message) 发送单条数据到Kafka集群 public void send(List<KeyedMessage<K,V>> messages) 发送多条数据(数据集)到

  • Springboot集成Kafka实现producer和consumer的示例代码

    本文介绍如何在springboot项目中集成kafka收发message. Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性: 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能.高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息.支持通过Kafka服务器和消费机集群来分区消息.支持Hadoop并行数据加载. 安装Kafka 因为安装kafka需要zookeeper的支持,所以Windows安装时需要将zookee

  • Kafka Producer中的消息缓存模型图解详解

    目录 前言 什么是消息累加器RecordAccumulator 消息缓存模型 ProducerBatch的内存大小 内存分配 Batch的创建和释放 问题和答案 总结 前言 在阅读本文之前, 希望你可以思考一下下面几个问题, 带着问题去阅读文章会获得更好的效果. 发送消息的时候, 当Broker挂掉了,消息体还能写入到消息缓存中吗? 当消息还存储在缓存中的时候, 假如Producer客户端挂掉了,消息是不是就丢失了? 当最新的ProducerBatch还有空余的内存,但是接下来的一条消息很大,不

  • Android中的Bitmap缓存池使用详解

    本文介绍了如何使用缓存来提高UI的载入输入和滑动的流畅性.使用内存缓存.使用磁盘缓存.处理配置改变事件等方法将会有效的解决这个问题. 在您的UI中显示单个图片是非常简单的,如果您需要一次显示很多图片就有点复杂了.在很多情况下(例如使用 ListView, GridView 或者 ViewPager控件),显示在屏幕上的图片以及即将显示在屏幕上的图片数量是非常大的(例如在图库中浏览大量图片). 在这些控件中,当一个子控件不显示的时候,系统会重用该控件来循环显示 以便减少对内存的消耗.同时垃圾回收机

  • Django3中的自定义用户模型实例详解

    介绍 嗨,在这篇文章中,我们将学习在Django 3中创建自定义用户模型,并且还将更改Django Admin的默认登录功能. 我们将使用电子邮件和密码登录. 动机 我必须为我的应用程序创建一个"自定义用户",我能够创建模型,但问题是createsuperuser命令无法正常工作. 为了调试它,我不得不做大量研究,问题是当时大多数资源已经过时,所以我决定写这篇文章. 我做了一个GitHub仓库,所以如果您愿意,可以直接使用它(这里有说明) 让我们开始吧 首先,创建一个Django Pr

  • C++ 中消息队列函数实例详解

    C++ 中消息队列函数实例详解 1.消息队列结构体的定义 typedef struct{ uid_t uid; /* owner`s user id */ gid_t gid; /* owner`s group id */ udi_t cuid; /* creator`s user id */ gid_t cgid; /* creator`s group id */ mode_t mode; /* read-write permissions 0400 MSG_R 0200 MSG_W*/ ul

  • 对Django中内置的User模型实例详解

    User模型 User模型是这个框架的核心部分.他的完整的路径是在django.contrib.auth.models.User. 字段 内置的User模型拥有以下的字段: 1.username: 用户名.150个字符以内.可以包含数字和英文字符,以及_.@.+..和-字符.不能为空,且必须唯一! 2.first_name:歪果仁的first_name,在30个字符以内.可以为空. 3.last_name:歪果仁的last_name,在150个字符以内.可以为空. 4.email:邮箱.可以为空

  • 在Android中使用WebSocket实现消息通信的方法详解

    前言 消息推送功能可以说移动APP不可缺少的功能之一,一般简单的推送我们可以使用第三方推送的SDK,比如极光推送.信鸽推送等,但是对于消息聊天这种及时性有要求的或者三方推送不满足业务需求的,我们就需要使用WebSocket实现消息推送功能. 基本流程 WebSocket是什么,这里就不做介绍了,我们这里使用的开源框架是https://github.com/TakahikoKawasaki/nv-websocket-client 基于开源协议我们封装实现WebSocket的连接.注册.心跳.消息分

  • RocketMQ Push 消费模型示例详解

    目录 使用 DefaultMQPushConsumer 消费消息 基于长轮询机制的伪 push 实现 客户端侧发起的长轮询请求 服务端阻塞请求 客户端回调处理 客户端发起请求的底层逻辑 PullCallback 回调 总结 Push 模式是指由 Server 端来控制消息的推送,即当有消息到 Server 之后,会将消息主动投递给 client(Consumer 端). 使用 DefaultMQPushConsumer 消费消息 下面是使用 DefaultMQPushConsumer 消费消息的

  • Java内存模型JMM详解

    Java Memory Model简称JMM, 是一系列的Java虚拟机平台对开发者提供的多线程环境下的内存可见性.是否可以重排序等问题的无关具体平台的统一的保证.(可能在术语上与Java运行时内存分布有歧义,后者指堆.方法区.线程栈等内存区域). 并发编程有多种风格,除了CSP(通信顺序进程).Actor等模型外,大家最熟悉的应该是基于线程和锁的共享内存模型了.在多线程编程中,需要注意三类并发问题: ·原子性 ·可见性 ·重排序 原子性涉及到,一个线程执行一个复合操作的时候,其他线程是否能够看

  • java 中maven pom.xml文件教程详解

    maven pom.xml文件教程详解,具体内容如下所示: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.x

  • Java内存模型知识详解

    1. 概述 多任务和高并发是衡量一台计算机处理器的能力重要指标之一.一般衡量一个服务器性能的高低好坏,使用每秒事务处理数(Transactions Per Second,TPS)这个指标比较能说明问题,它代表着一秒内服务器平均能响应的请求数,而TPS值与程序的并发能力有着非常密切的关系.在讨论Java内存模型和线程之前,先简单介绍一下硬件的效率与一致性. 2.硬件的效率与一致性 由于计算机的存储设备与处理器的运算能力之间有几个数量级的差距,所以现代计算机系统都不得不加入一层读写速度尽可能接近处理

随机推荐