Kafka消费客户端协调器GroupCoordinator详解

目录
  • 协调器的生命周期
    • GroupCoordinator的创建
    • offsetConfig相关配置
  • groupConfig相关配置
    • groupMetadataManager
    • heartbeatPurgatory
    • GroupCoordinator的启动
    • GroupCoordinator OnElection
    • GroupCoordinator onResignation

协调器的生命周期

  • 什么是协调器
  • 协调器工作原理
  • 协调器的Rebalance机制

GroupCoordinator的创建

在Kafka启动的时候, 会自动创建并启动GroupCoordinator

这个GroupCoordinator对象创建的时候传入的几个属性需要介绍一下

    new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupMetadataManager, heartbeatPurgatory, joinPurgatory, time, metrics)

offsetConfig相关配置

  private[group] def offsetConfig(config: KafkaConfig) = OffsetConfig(
    maxMetadataSize = config.offsetMetadataMaxSize,
    loadBufferSize = config.offsetsLoadBufferSize,
    offsetsRetentionMs = config.offsetsRetentionMinutes * 60L * 1000L,
    offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
    offsetsTopicNumPartitions = config.offsetsTopicPartitions,
    offsetsTopicSegmentBytes = config.offsetsTopicSegmentBytes,
    offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
    offsetsTopicCompressionCodec = config.offsetsTopicCompressionCodec,
    offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
    offsetCommitRequiredAcks = config.offsetCommitRequiredAcks
  )
属性 介绍 默认值
offset.metadata.max.bytes    
offsets.load.buffer.size    
offsets.retention.minutes    
offsets.retention.check.interval.ms    
offsets.topic.num.partitions    
offsets.commit.timeout.ms    
offsets.topic.segment.bytes    
offsets.topic.replication.factor    
offsets.topic.compression.codec    
offsets.commit.timeout.ms    
offsets.commit.required.acks    

groupConfig相关配置

属性 介绍 默认值
group.min.session.timeout.ms    
group.max.session.timeout.ms    
group.initial.rebalance.delay.ms    
group.max.size    
group.initial.rebalance.delay.ms    

groupMetadataManager

组元信息管理类

heartbeatPurgatory

心跳监测操作,每一秒执行一次

joinPurgatory

GroupCoordinator的启动

  def startup(enableMetadataExpiration: Boolean = true): Unit = {
    info("Starting up.")
    groupManager.startup(enableMetadataExpiration)
    isActive.set(true)
    info("Startup complete.")
  }

这个启动对于GroupCoordinator来说只是给属性isActive标记为了true, 但是同时呢也调用了GroupMetadataManager.startup

定时清理Group元信息

这个Group元信息管理类呢启动了一个定时任务, 名字为:delete-expired-group-metadata

每隔600000ms的时候就执行一下 清理过期组元信息的操作, 这个600000ms时间是代码写死的。

TODO:GroupMetadataManager#cleanupGroupMetadata

GroupCoordinator OnElection

当内部topic __consumer_offsets 有分区的Leader变更的时候,比如触发了 LeaderAndIsr的请求, 发现分区Leader进行了切换。

那么就会执行 GroupCoordinator#OnElection 的接口, 这个接口会把任务丢个一个单线程的调度程序, 专门处理offset元数据缓存加载和卸载的。线程名称前缀为group-metadata-manager- ,一个分区一个任务

最终执行的任务内容是:GroupMetadataManager#doLoadGroupsAndOffsets

__consumer_offsets 的key有两种消息类型

key version 0: 消费组消费偏移量信息 -> value version 0: [offset, metadata, timestamp]

key version 1: 消费组消费偏移量信息-> value version 1: [offset, metadata, commit_timestamp, expire_timestamp]

key version 2: 消费组的元信息 -> value version 0: [protocol_type, generation, protocol, leader,

例如 version:3 的schemaForGroupValue

Version-0

{
	protocol_type: STRING,
	generation: INT32,
	protocol: NULLABLE_STRING,
	leader: NULLABLE_STRING,
	members: ARRAY({
		member_id: STRING,
		client_id: STRING,
		client_host: STRING,
		session_timeout: INT32,
		subscription: BYTES,
		assignment: BYTES
	})
}

Version-1

{
	protocol_type: STRING,
	generation: INT32,
	protocol: NULLABLE_STRING,
	leader: NULLABLE_STRING,
	members: ARRAY({
		member_id: STRING,
		client_id: STRING,
		client_host: STRING,
		rebalance_timeout: INT32,
		session_timeout: INT32,
		subscription: BYTES,
		assignment: BYTES
	})
}

Version-2

{
	protocol_type: STRING,
	generation: INT32,
	protocol: NULLABLE_STRING,
	leader: NULLABLE_STRING,
	current_state_timestamp: INT64,
	members: ARRAY({
		member_id: STRING,
		client_id: STRING,
		client_host: STRING,
		rebalance_timeout: INT32,
		session_timeout: INT32,
		subscription: BYTES,
		assignment: BYTES
	})
}

Version-3

{
	protocol_type: STRING,
	generation: INT32,
	protocol: NULLABLE_STRING,
	leader: NULLABLE_STRING,
	current_state_timestamp: INT64,
	members: ARRAY({
		member_id: STRING,
		group_instance_id: NULLABLE_STRING,
		client_id: STRING,
		client_host: STRING,
		rebalance_timeout: INT32,
		session_timeout: INT32,
		subscription: BYTES,
		assignment: BYTES
	})
}

Value每个版本的 Scheme如下

  private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(
    new Field(PROTOCOL_TYPE_KEY, STRING),
    new Field(GENERATION_KEY, INT32),
    new Field(PROTOCOL_KEY, NULLABLE_STRING),
    new Field(LEADER_KEY, NULLABLE_STRING),
    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V0)))
  private val GROUP_METADATA_VALUE_SCHEMA_V1 = new Schema(
    new Field(PROTOCOL_TYPE_KEY, STRING),
    new Field(GENERATION_KEY, INT32),
    new Field(PROTOCOL_KEY, NULLABLE_STRING),
    new Field(LEADER_KEY, NULLABLE_STRING),
    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V1)))
  private val GROUP_METADATA_VALUE_SCHEMA_V2 = new Schema(
    new Field(PROTOCOL_TYPE_KEY, STRING),
    new Field(GENERATION_KEY, INT32),
    new Field(PROTOCOL_KEY, NULLABLE_STRING),
    new Field(LEADER_KEY, NULLABLE_STRING),
    new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V2)))
  private val GROUP_METADATA_VALUE_SCHEMA_V3 = new Schema(
    new Field(PROTOCOL_TYPE_KEY, STRING),
    new Field(GENERATION_KEY, INT32),
    new Field(PROTOCOL_KEY, NULLABLE_STRING),
    new Field(LEADER_KEY, NULLABLE_STRING),
    new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V3)))

GroupCoordinator onResignation

以上就是Kafka消费客户端协调器GroupCoordinator详解的详细内容,更多关于Kafka GroupCoordinator的资料请关注我们其它相关文章!

(0)

相关推荐

  • Go操作Kafka和Etcd方法详解

    目录 操作Kafka sarama 下载及安装 注意事项 连接 kafka 发送消息 连接 kafka 消费消息 操作Etcd 安装 put和get操作 watch操作 安装报错: 操作Kafka Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据,具有高性能.持久化.多副本备份.横向扩展等特点.本文介绍了如何使用 Go 语言发送和接收 kafka 消息. sarama Go 语言中连接 kafka 使用第三方库:github.com/Shopify

  • kafka运维consumer-groups.sh消费者组管理

    目录 消费者组管理 kafka-consumer-groups.sh 1. 查看消费者列表--list 2. 查看消费者组详情--describe 3. 删除消费者组--delete 4. 重置消费组的偏移量 --reset-offsets 5. 删除偏移量delete-offsets 消费者组管理 kafka-consumer-groups.sh 1. 查看消费者列表--list sh bin/kafka-consumer-groups.sh --bootstrap-server xxxx:9

  • 使用jmx exporter采集kafka指标示例详解

    目录 预置条件 使用JMX exporter暴露指标 kafka集群启用监控 采集producer/consumer的指标 预置条件 安装kafka.prometheus 使用JMX exporter暴露指标 下载jmx exporter以及配置文件.Jmx exporter中包含了kafka各个组件的指标,如server metrics.producer metrics.consumer metrics等,但这些指标并不是prometheus格式的,因此需要通过重命名方式转变为promethe

  • Kafka消费客户端协调器GroupCoordinator详解

    目录 协调器的生命周期 GroupCoordinator的创建 offsetConfig相关配置 groupConfig相关配置 groupMetadataManager heartbeatPurgatory GroupCoordinator的启动 GroupCoordinator OnElection GroupCoordinator onResignation 协调器的生命周期 什么是协调器 协调器工作原理 协调器的Rebalance机制 GroupCoordinator的创建 在Kafka

  • React为什么需要Scheduler调度器原理详解

    目录 正文 我们为什么需要Scheduler(调度器) Scheduler如何进行工作 总结 正文 最近在重学React,由于近两年没使用React突然重学发现一些很有意思的概念,首先便是React的Scheduler(调度器) 由于我对React的概念还停留在React 15之前(就是那个没有hooks的年代),所以接触Scheduler(调度器) 让我感觉很有意思: 在我印象中React的架构分为两层(React 16 之前) Reconciler(协调器)—— 负责找出变化的组件 Rend

  • AngularJS中的拦截器实例详解

    AngularJS中的拦截器实例详解 异步操作 有时候需要在拦截器中做一些异步操作.幸运的是, AngularJS 允许我们返回一个 promise 延后处理.它将会在请求拦截器中延迟发送请求或者在响应拦截器中推迟响应. 下面是项目中用到的代码. ZbtjxcApp.factory('myHttpInterceptor', ['$q', '$window','$location', function($q, $window,$location) { return { // 全局响应 'respo

  • Socket+JDBC+IO实现Java文件上传下载器DEMO详解

    该demo实现的功能有: 1.用户注册: 注册时输入两次密码,若两次输入不一致,则注册失败,需要重新输入.若用户名被注册过,则提示用户重新输入用户名: 2.用户登录: 需要验证数据库中是否有对应的用户名和密码,若密码输错三次,则终止用户的登录操作: 3.文件上传: 从本地上传文件到文件数据库中 4.文件下载: 从数据库中下载文件到本地 5.文件更新: 根据id可更新数据库中的文件名 6.文件删除: 根据id删除数据库中某一个文件 7.看数据库所有文件; 8.查看文件(根据用户名); 9.查看文件

  • python中函数总结之装饰器闭包详解

    1.前言 函数也是一个对象,从而可以增加属性,使用句点来表示属性. 如果内部函数的定义包含了在外部函数中定义的对象的引用(外部对象可以是在外部函数之外),那么内部函数被称之为闭包. 2.装饰器 装饰器就是包装原来的函数,从而在不需要修改原来代码的基础之上,可以做更多的事情. 装饰器语法如下: @deco2 @deco1 def func(arg1,arg2...): pass 这个表示了有两个装饰器的函数,那么表示的含义为:func = deco2(deco1(func)) 无参装饰器语法如下:

  • Mybatis Plugin拦截器开发过程详解

    这篇文章主要介绍了Mybatis Plugin拦截器开发过程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 1.Plugin MyBatis 允许使用插件来拦截的方法调用包括: • Executor (update, query, flushStatements, commit, rollback,getTransaction, close, isClosed) • ParameterHandler (getParameterObject,

  • ffmpeg播放器实现详解之框架搭建过程

    ffplay是ffmpeg源码中一个自带的开源播放器实例,同时支持本地视频文件的播放以及在线流媒体播放,功能非常强大. FFplay: FFplay is a very simple and portable media player using the FFmpeg libraries and the SDL library. It is mostly used as a testbed for the various FFmpeg APIs. ffplay中的代码充分调用了ffmpeg中的函

  • ffmpeg播放器实现详解之视频显示(推荐)

    FFmpeg是一套可以用来记录.转换数字音频.视频,并能将其转化为流的开源计算机程序.它包括了目前领先的音/视频编码库libavcodec. FFmpeg是在 Linux 下开发出来的,但它可以在包括 Windows 在内的大多数操作系统中编译.这个项目是由 Fabrice Bellard 发起的,现在由 Michael Niedermayer 主持.可以轻易地实现多种视频格式之间的相互转换,例如可以将摄录下的视频avi等转成现在视频网站所采用的flv格式. ffplay是ffmpeg源码中一个

  • python爬虫中的url下载器用法详解

    前期的入库筛选工作已经由url管理器完成了,整理的工作自然要由url下载器接手.当我们需要爬取的数据已经去重后,下载器的主要任务的是这些数据下载下来.所以它的使用也并不复杂,不过需要借助到我们之前所学过的一个库进行操作,相信之前的基础大家都学的很牢固.下面小编就来为大家介绍url下载器及其使用的方法. 下载器的作用就是接受URL管理器传递给它的一个url,然后把该网页的内容下载下来.python自带有urllib和urllib2等库(这两个库在python3中合并为urllib),它们的作用就是

  • Java Spring拦截器案例详解

    springmvc提供了拦截器,类似于过滤器,他将在我们的请求具体出来之前先做检查,有权决定接下来是否继续,对我们的请求进行加工. 拦截器,可以设计多个. 通过实现handlerunterceptor,这是个接口 定义了非常重要的三个方法: 后置处理 前置处理 完成处理 案例一: 通过拦截器实现方法耗时统计与警告 package com.xy.interceptors; import org.springframework.web.servlet.HandlerInterceptor; impo

随机推荐