Kafka源码系列教程之删除topic

前言

Apache Kafka发源于LinkedIn,于2011年成为Apache的孵化项目,随后于2012年成为Apache的主要项目之一。Kafka使用Scala和Java进行编写。Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统。Kafka具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。

本文依然是以kafka0.8.2.2为例讲解

一,如何删除一个topic

删除一个topic有两个关键点:

1,配置删除参数

delete.topic.enable这个Broker参数配置为True。

2,执行

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name

假如不配置删除参数为true的话,topic其实并没有被清除,只是被标记为删除。此时,估计一般人的做法是删除topic在Zookeeper的信息和日志,其实这个操作并不会清除kafkaBroker内存的topic数据。所以,此时最佳的策略是配置删除参数为true然后,重启kafka。

二,重要的类介绍

1,PartitionStateMachine

该类代表分区的状态机。决定者分区的当前状态,和状态转移。四种状态

  • NonExistentPartition
  • NewPartition
  • OnlinePartition
  • OfflinePartition

2,ReplicaManager

负责管理当前机器的所有副本,处理读写、删除等具体动作。

读写:写获取partition对象,再获取Replica对象,再获取Log对象,采用其管理的Segment对象将数据写入、读出。

3,ReplicaStateMachine

副本的状态机。决定者副本的当前状态和状态之间的转移。一个副本总共可以处于一下几种状态的一种
NewReplica:Crontroller在分区重分配的时候可以创建一个新的副本。只能接受变为follower的请求。前状态可以是NonExistentReplica

OnlineReplica:新启动的分区,能接受变为leader或者follower请求。前状态可以是NewReplica, OnlineReplica or OfflineReplica

OfflineReplica:死亡的副本处于这种状态。前状态可以是NewReplica, OnlineReplica

ReplicaDeletionStarted:分本删除开始的时候处于这种状态,前状态是OfflineReplica

ReplicaDeletionSuccessful:副本删除成功。前状态是ReplicaDeletionStarted

ReplicaDeletionIneligible:删除失败的时候处于这种状态。前状态是ReplicaDeletionStarted

NonExistentReplica:副本成功删除之后处于这种状态,前状态是ReplicaDeletionSuccessful

4,TopicDeletionManager

该类管理着topic删除的状态机

1),TopicCommand通过创建/admin/delete_topics/<topic>,来发布topic删除命令。

2),Controller监听/admin/delete_topic子节点变动,开始分别删除topic

3),Controller有个后台线程负责删除Topic

三,源码彻底解析topic的删除过程

此处会分四个部分:

A),客户端执行删除命令作用

B),不配置delete.topic.enable整个流水的源码

C),配置了delete.topic.enable整个流水的源码

D),手动删除zk上topic信息和磁盘数据

1,客户端执行删除命令

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name

进入kafka-topics.sh我们会看到

exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand $@

进入TopicCommand里面,main方法里面

else if(opts.options.has(opts.deleteOpt))
 deleteTopic(zkClient, opts)

实际内容是

val topics = getTopics(zkClient, opts)
if (topics.length == 0) {
 println("Topic %s does not exist".format(opts.options.valueOf(opts.topicOpt)))
}
topics.foreach { topic =>
 try {
 ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic))

在"/admin/delete_topics"目录下创建了一个topicName的节点。

2,假如不配置delete.topic.enable整个流水是

总共有两处listener会响应:

A),TopicChangeListener

B),DeleteTopicsListener

使用topic的删除命令删除一个topic的话,指挥触发DeleteTopicListener。

var topicsToBeDeleted = {
 import JavaConversions._
 (children: Buffer[String]).toSet
}
val nonExistentTopics = topicsToBeDeleted.filter(t => !controllerContext.allTopics.contains(t))
topicsToBeDeleted --= nonExistentTopics
if(topicsToBeDeleted.size > 0) {
 info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(","))
 // mark topic ineligible for deletion if other state changes are in progress
 topicsToBeDeleted.foreach { topic =>
 val preferredReplicaElectionInProgress =
  controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic)
 val partitionReassignmentInProgress =
  controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
 if(preferredReplicaElectionInProgress || partitionReassignmentInProgress)
  controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
 }
 // add topic to deletion list
 controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)
}

由于都会判断delete.topic.enable是否为true,假如不为true就不会执行,为true就进入执行

controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)

3,delete.topic.enable配置为true

此处与步骤2的区别,就是那两个处理函数。

controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)

markTopicIneligibleForDeletion函数的处理为

if(isDeleteTopicEnabled) {
 val newTopicsToHaltDeletion = topicsToBeDeleted & topics
 topicsIneligibleForDeletion ++= newTopicsToHaltDeletion
 if(newTopicsToHaltDeletion.size > 0)
 info("Halted deletion of topics %s".format(newTopicsToHaltDeletion.mkString(",")))
}

主要是停止删除topic,假如存储以下三种情况

* Halt delete topic if -
* 1. replicas being down
* 2. partition reassignment in progress for some partitions of the topic
* 3. preferred replica election in progress for some partitions of the topic

enqueueTopicsForDeletion主要作用是更新删除topic的集合,并激活TopicDeleteThread

def enqueueTopicsForDeletion(topics: Set[String]) {
 if(isDeleteTopicEnabled) {
 topicsToBeDeleted ++= topics
 partitionsToBeDeleted ++= topics.flatMap(controllerContext.partitionsForTopic)
 resumeTopicDeletionThread()
 }
}

在删除线程DeleteTopicsThread的doWork方法中

topicsQueuedForDeletion.foreach { topic =>
// if all replicas are marked as deleted successfully, then topic deletion is done
 if(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) {
 // clear up all state for this topic from controller cache and zookeeper
 completeDeleteTopic(topic)
 info("Deletion of topic %s successfully completed".format(topic))
 }

进入completeDeleteTopic方法中

// deregister partition change listener on the deleted topic. This is to prevent the partition change listener
// firing before the new topic listener when a deleted topic gets auto created
partitionStateMachine.deregisterPartitionChangeListener(topic)
val replicasForDeletedTopic = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
// controller will remove this replica from the state machine as well as its partition assignment cache
replicaStateMachine.handleStateChanges(replicasForDeletedTopic, NonExistentReplica)
val partitionsForDeletedTopic = controllerContext.partitionsForTopic(topic)
// move respective partition to OfflinePartition and NonExistentPartition state
partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, OfflinePartition)
partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition)
topicsToBeDeleted -= topic
partitionsToBeDeleted.retain(_.topic != topic)
controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))
controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicConfigPath(topic))
controllerContext.zkClient.delete(ZkUtils.getDeleteTopicPath(topic))
controllerContext.removeTopic(topic)

主要作用是解除掉监控分区变动的listener,删除Zookeeper具体节点信息,删除磁盘数据,更新内存数据结构,比如从副本状态机里面移除分区的具体信息。

其实,最终要的是我们的副本磁盘数据是如何删除的。我们重点介绍这个部分。

首次清除的话,在删除线程DeleteTopicsThread的doWork方法中

{
 // if you come here, then no replica is in TopicDeletionStarted and all replicas are not in
 // TopicDeletionSuccessful. That means, that either given topic haven't initiated deletion
 // or there is at least one failed replica (which means topic deletion should be retried).
 if(controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {
 // mark topic for deletion retry
 markTopicForDeletionRetry(topic)
 }

进入markTopicForDeletionRetry

val failedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionIneligible)
info("Retrying delete topic for topic %s since replicas %s were not successfully deleted"
 .format(topic, failedReplicas.mkString(",")))
controller.replicaStateMachine.handleStateChanges(failedReplicas, OfflineReplica)

在ReplicaStateMachine的handleStateChanges方法中,调用了handleStateChange,处理OfflineReplica

// send stop replica command to the replica so that it stops fetching from the leader
brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false)

接着在handleStateChanges中

brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)

给副本数据存储节点发送StopReplicaKey副本指令,并开始删除数据

stopReplicaRequestMap foreach { case(broker, replicaInfoList) =>
 val stopReplicaWithDelete = replicaInfoList.filter(p => p.deletePartition == true).map(i => i.replica).toSet
 val stopReplicaWithoutDelete = replicaInfoList.filter(p => p.deletePartition == false).map(i => i.replica).toSet
 debug("The stop replica request (delete = true) sent to broker %d is %s"
 .format(broker, stopReplicaWithDelete.mkString(",")))
 debug("The stop replica request (delete = false) sent to broker %d is %s"
 .format(broker, stopReplicaWithoutDelete.mkString(",")))
 replicaInfoList.foreach { r =>
 val stopReplicaRequest = new StopReplicaRequest(r.deletePartition,
  Set(TopicAndPartition(r.replica.topic, r.replica.partition)), controllerId, controllerEpoch, correlationId)
 controller.sendRequest(broker, stopReplicaRequest, r.callback)
 }
}
stopReplicaRequestMap.clear()

Broker的KafkaApis的Handle方法在接受到指令后

case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
val (response, error) = replicaManager.stopReplicas(stopReplicaRequest)

接着是在stopReplicas方法中

{
 controllerEpoch = stopReplicaRequest.controllerEpoch
 // First stop fetchers for all partitions, then stop the corresponding replicas
 replicaFetcherManager.removeFetcherForPartitions(stopReplicaRequest.partitions.map(r => TopicAndPartition(r.topic, r.partition)))
 for(topicAndPartition <- stopReplicaRequest.partitions){
 val errorCode = stopReplica(topicAndPartition.topic, topicAndPartition.partition, stopReplicaRequest.deletePartitions)
 responseMap.put(topicAndPartition, errorCode)
 }
 (responseMap, ErrorMapping.NoError)
}

进一步进入stopReplica方法,正式进入日志删除

getPartition(topic, partitionId) match {
 case Some(partition) =>
 if(deletePartition) {
  val removedPartition = allPartitions.remove((topic, partitionId))
  if (removedPartition != null)
  removedPartition.delete() // this will delete the local log
 }

以上就是kafka的整个日志删除流水。

4,手动删除zk上topic信息和磁盘数据

TopicChangeListener会监听处理,但是处理很简单,只是更新了

val deletedTopics = controllerContext.allTopics -- currentChildren
controllerContext.allTopics = currentChildren

val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq)
controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>

四,总结

Kafka的topic的删除过程,实际上就是基于Zookeeper做了一个订阅发布系统。Zookeeper的客户端创建一个节点/admin/delete_topics/<topic>,由kafka Controller监听到事件之后正式触发topic的删除:解除Partition变更监听的listener,清除内存数据结构,删除副本数据,删除topic的相关Zookeeper节点。

delete.topic.enable配置该参数为false的情况下执行了topic的删除命令,实际上未做任何动作。我们此时要彻底删除topic建议修改该参数为true,重启kafka,这样topic信息会被彻底删除,已经测试。

一般流行的做法是手动删除Zookeeper的topic相关信息及磁盘数据但是这样的话会造成部分内存数据未清除。至于是否会有隐患,未测试。

好了,以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对我们的支持。

(0)

相关推荐

  • 详解使用docker搭建kafka环境

    Requirements 最近学习了下kafka,为方便搭建环境,使用docker进行部署. 需要首先安装docker的环境.要求操作系统是linux的64位系统. docker的安装(适于rpm/deb安装): curl -fsSL https://get.docker.com/ | sh docker-compose的安装: curl -L https://github.com/docker/compose/releases/download/1.7.0/docker-compose-`un

  • Java使用kafka发送和生产消息的示例

    1. maven依赖包 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> </dependency> 2. 生产者代码 package com.lnho.example.kafka; import org.apache.kafka.c

  • Linux下Kafka单机安装配置方法(图文)

    介绍 Kafka是一个分布式的.可分区的.可复制的消息系统.它提供了普通消息系统的功能,但具有自己独特的设计.这个独特的设计是什么样的呢? 首先让我们看几个基本的消息系统术语: •Kafka将消息以topic为单位进行归纳. •将向Kafka topic发布消息的程序成为producers. •将预订topics并消费消息的程序成为consumer. •Kafka以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker. producers通过网络将消息发送到Kafka集群,集群

  • kafka生产实践(详解)

    1.引言 最近接触到一个APP流量分析的项目,类似于友盟.涉及到几个C端(客户端)高并发的接口,这几个接口主要用于C端数据的提交.在没有任何缓冲的情况下,一个接口涉及到5张表的提交.压测的结果很不理想,主要瓶颈就在与RDS的交互. 一台双核,16G机子,单实例,jdbc最大连接数100,吞吐量竟然只有50TPS. 能想到的改造方案就是引入一层缓冲,让C端接口不与RDS直接交互,很自然就想到了rabbitmq,但是rabbitmq对分布式的支持比较一般,我们的数据体量也比较大,所以我们借鉴了友盟,

  • Kafka使用入门教程第1/2页

    介绍 Kafka是一个分布式的.可分区的.可复制的消息系统.它提供了普通消息系统的功能,但具有自己独特的设计.这个独特的设计是什么样的呢? 首先让我们看几个基本的消息系统术语: •Kafka将消息以topic为单位进行归纳. •将向Kafka topic发布消息的程序成为producers. •将预订topics并消费消息的程序成为consumer. •Kafka以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker. producers通过网络将消息发送到Kafka集群,集群

  • Kafka 常用命令行详细介绍及整理

     Kafka 常用命令行详细介绍及整理 以下是kafka常用命令行总结: 1.查看topic的详细信息 ./kafka-topics.sh -zookeeper 127.0.0.1:2181 -describe -topic testKJ1 2.为topic增加副本 ./kafka-reassign-partitions.sh -zookeeper 127.0.0.1:2181 -reassignment-json-file json/partitions-to-move.json -execu

  • Spring Boot集成Kafka的示例代码

    本文介绍了Spring Boot集成Kafka的示例代码,分享给大家,也给自己留个笔记 系统环境 使用远程服务器上搭建的kafka服务 Ubuntu 16.04 LTS kafka_2.12-0.11.0.0.tgz zookeeper-3.5.2-alpha.tar.gz 集成过程 1.创建spring boot工程,添加相关依赖: <?xml version="1.0" encoding="UTF-8"?> <project xmlns=&qu

  • spring boot与kafka集成的简单实例

    本文介绍了spring boot与kafka集成的简单实例,分享给大家,具体如下: 引入相关依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.

  • Kafka源码系列教程之删除topic

    前言 Apache Kafka发源于LinkedIn,于2011年成为Apache的孵化项目,随后于2012年成为Apache的主要项目之一.Kafka使用Scala和Java进行编写.Apache Kafka是一个快速.可扩展的.高吞吐.可容错的分布式发布订阅消息系统.Kafka具有高吞吐量.内置分区.支持数据副本和容错的特性,适合在大规模消息处理场景中使用. 本文依然是以kafka0.8.2.2为例讲解 一,如何删除一个topic 删除一个topic有两个关键点: 1,配置删除参数 dele

  • vue从使用到源码实现教程详解

    搭建环境 项目github地址 项目中涉及了json-server模拟get请求,用了vue-router: 关于Vue生命周期以及vue-router钩子函数详解 生命周期 1.0版本 1.哪些生命周期接口 init Created beforeCompile Compiled Ready Attatched Detached beforeDestory destoryed 2.执行顺序 1. 不具有keep-alive 进入: init->create->beforeCompile->

  • 企业级使用LAMP源码安装教程

    目录 LAMP架构 1.lamp介绍 2.web服务工作流程 web服务器的资源分为俩种:静态和动态资源 web服务器如何处理客户端的请求 2.1cgi和fastcgi 2.2httpd与php结合 2.3web工作流程 3.LAMP平台构建 环境: lamp安装的顺序: 3.1安装httpd 3.2安装mysql 3.3安装php 3.4配置php 3.5配置apache 4.博客创建1 5.服务开机自启配置选择性使用 LAMP架构 (同一台服务器上搭建) 1.lamp介绍 lamp,由lin

  • windows下使用 intellij idea 编译 kafka 源码环境

    1. 从 GitHub 网站,git clone kafka 源码 2. 下载安装好 gradle,scala 2.1 从 dependencies.gradle 文件中找到 gradle 的版本,然后下载指定版本,并配置好 GRADLE_HOME 环境变量 3. 进入 kafka 项目目录,依次执行 gradle wrapper,gradle idea,gradle build --exclude-task test 4. 将工程导入到 idea 4.1 启动主类 kafka.Kafka 4.

  • redis源码分析教程之压缩链表ziplist详解

    前言 压缩列表(ziplist)是由一系列特殊编码的内存块构成的列表,它对于Redis的数据存储优化有着非常重要的作用.这篇文章总结一下redis中使用非常多的一个数据结构压缩链表ziplist.该数据结构在redis中说是无处不在也毫不过分,除了链表以外,很多其他数据结构也是用它进行过渡的,比如前面文章提到的SortedSet.下面话不多说了,来一起看看详细的介绍吧. 一.压缩链表ziplist数据结构简介 首先从整体上看下ziplist的结构,如下图: 压缩链表ziplist结构图 可以看出

  • idea2020导入spring5.1的源码详细教程

    1.先来说下导入环境和工具: java版本:1.8 idea: idea2020 gradle:gradle5.3 2.gradle的安装 idea导入spring的源码时,每次构建的时候都会去下载gradle,所以如果我们能提前下好gradle可以提高构建速度. gradle下载地址:https://services.gradle.org/distributions/ 上面放的地址可以找到gradle的所有版本.(注意:构建spring源码是有版本要求的,所以尽量选择高版本的gradle) 下

  • Android源码系列之深入理解ImageView的ScaleType属性

    做Android开发的童靴们肯定对系统自带的控件使用的都非常熟悉,比如Button.TextView.ImageView等.如果你问我具体使用,我会给说:拿ImageView来说吧,首先创建一个新的项目,在项目布局文件中应用ImageView控件,代码如下: <?xml version="1.0" encoding="utf-8"?> <LinearLayout xmlns:android="http://schemas.android.

  • IDEA查看Scala的源码的教程图解

    1.下载源代码 点击这个链接:http://www.scala-lang.org/download/all.html 选择需要的版本点击打开: 在新打开的网页下方找到源代码下载项: 2.在intellij idea设置指向源代码 在intellij中打开File –> Project Structure 选择Global Libraries –> 选择Sources 在右侧面板中的Scala Library下方点击"+"按钮: 选择解压后的源码包的src目录: ok即可.

  • OpenSSL动态链接库源码安装教程

    Openssl 是一个开放源代码的SSL协议的产品实现,它采用C语言作为开发语言,具备了跨系统的性能.调用Openssl的函数就可以实现一个SSL加密的安全数据传输通道,从而保证客户端和服务器之间数据的安全. OpenSSL整个软件包大概可以分成三个主要的功能部分:SSL协议库.应用程序以及密码算法库.OpenSSL的目录结构自然也是围绕这三个功能部分进行规划的.作为一个基于密码学的安全开发包,OpenSSL提供的功能相当强大和全面,囊括了主要的密码算法.常用的密钥和证书封装管理功能以及SSL协

  • Java并发系列之Semaphore源码分析

    Semaphore(信号量)是JUC包中比较常用到的一个类,它是AQS共享模式的一个应用,可以允许多个线程同时对共享资源进行操作,并且可以有效的控制并发数,利用它可以很好的实现流量控制.Semaphore提供了一个许可证的概念,可以把这个许可证看作公共汽车车票,只有成功获取车票的人才能够上车,并且车票是有一定数量的,不可能毫无限制的发下去,这样就会导致公交车超载.所以当车票发完的时候(公交车以满载),其他人就只能等下一趟车了.如果中途有人下车,那么他的位置将会空闲出来,因此如果这时其他人想要上车

随机推荐