Spring Boot集群管理工具KafkaAdminClient使用方法解析

原理介绍

在Kafka官网中这么描述AdminClient:The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects. 具体的KafkaAdminClient包含了一下几种功能(以Kafka1.0.0版本为准):

  • 创建Topic:createTopics(Collection<NewTopic> newTopics)
  • 删除Topic:deleteTopics(Collection<String> topics)
  • 罗列所有Topic:listTopics()
  • 查询Topic:describeTopics(Collection<String> topicNames)
  • 查询集群信息:describeCluster()
  • 查询ACL信息:describeAcls(AclBindingFilter filter)
  • 创建ACL信息:createAcls(Collection<AclBinding> acls)
  • 删除ACL信息:deleteAcls(Collection<AclBindingFilter> filters)
  • 查询配置信息:describeConfigs(Collection<ConfigResource> resources)
  • 修改配置信息:alterConfigs(Map<ConfigResource, Config> configs)
  • 修改副本的日志目录:alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment)
  • 查询节点的日志目录信息:describeLogDirs(Collection<Integer> brokers)
  • 查询副本的日志目录信息:describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas)
  • 增加分区:createPartitions(Map<String, NewPartitions> newPartitions)

其内部原理是使用Kafka自定义的一套二进制协议来实现,详细可以参见Kafka协议。主要实现步骤:

客户端根据方法的调用创建相应的协议请求,比如创建Topic的createTopics方法,其内部就是发送CreateTopicRequest请求。
客户端发送请求至Kafka Broker。

Kafka Broker处理相应的请求并回执,比如与CreateTopicRequest对应的是CreateTopicResponse。
客户端接收相应的回执并进行解析处理。

和协议有关的请求和回执的类基本都在org.apache.kafka.common.requests包中,AbstractRequest和AbstractResponse是这些请求和回执类的两个基本父类。

代码如下

@Component
public class KafkaConfig{

   // 配置Kafka
  public Properties getProps(){
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
/*    props.put("retries", 2); // 重试次数
    props.put("batch.size", 16384); // 批量发送大小
    props.put("buffer.memory", 33554432); // 缓存大小,根据本机内存大小配置
    props.put("linger.ms", 1000); // 发送频率,满足任务一个条件发送*/
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    return props;
  }

}
@RestController
public class KafkaTopicManager {

  @Autowired
  private KafkaConfig kafkaConfig;

  @GetMapping("createTopic")
  public void createTopic(){
    AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps());

    NewTopic newTopic = new NewTopic("test1",4, (short) 1);
    Collection<NewTopic> newTopicList = new ArrayList<>();
    newTopicList.add(newTopic);
    adminClient.createTopics(newTopicList);

    adminClient.close();
  }
  @GetMapping("deleteTopic")
  public void deleteTopic(){
    AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps());
    adminClient.deleteTopics(Arrays.asList("test1"));
    adminClient.close();
  }
  @GetMapping("listAllTopic")
  public void listAllTopic(){
    AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps());
    ListTopicsResult result = adminClient.listTopics();
    KafkaFuture<Set<String>> names = result.names();
    try {
      names.get().forEach((k)->{
        System.out.println(k);
      });
    } catch (InterruptedException | ExecutionException e) {
      e.printStackTrace();
    }
    adminClient.close();
  }
  @GetMapping("getTopic")
  public void getTopic(){
    AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps());

    DescribeTopicsResult describeTopics = adminClient.describeTopics(Arrays.asList("syn-test"));

    Collection<KafkaFuture<TopicDescription>> values = describeTopics.values().values();

    if(values.isEmpty()){
      System.out.println("找不到描述信息");
    }else{
      for (KafkaFuture<TopicDescription> value : values) {
        System.out.println(value);
      }
    }
    adminClient.close();
  }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • SpringBoot Admin 使用指南(推荐)

    Spring Boot Admin 是一个管理和监控你的 Spring Boot 应用程序的应用程序. 这些应用程序通过 Spring Boot Admin Client(通过 HTTP)注册或者使用 Spring Cloud(例如 Eureka)发现. UI只是 Spring Boot Actuator 端点上的一个 AngularJs 应用程序. 快速开始 首先在 IDEA 创建一个 SpringBoot 项目,把它当作 server 端,工程如下: 然后在 pom.xml 中引入依赖: <

  • 使用spring-boot-admin对spring-boot服务进行监控的实现方法

    spring-boot-admin,简称SBA,是一个针对spring-boot的actuator接口进行UI美化封装的监控工具.他可以:在列表中浏览所有被监控spring-boot项目的基本信息,详细的Health信息.内存信息.JVM信息.垃圾回收信息.各种配置信息(比如数据源.缓存列表和命中率)等,还可以直接修改logger的level. 官网:https://github.com/codecentric/spring-boot-admin 使用指南:http://codecentric.

  • Spring Boot Admin微服务应用监控的实现

    Spring Boot Admin 可以对SpringBoot应用的各项指标进行监控,可以作为微服务架构中的监控中心来使用,本文将对其用法进行详细介绍. Spring Boot Admin 简介 SpringBoot应用可以通过Actuator来暴露应用运行过程中的各项指标,Spring Boot Admin通过这些指标来监控SpringBoot应用,然后通过图形化界面呈现出来.Spring Boot Admin不仅可以监控单体应用,还可以和Spring Cloud的注册中心相结合来监控微服务应

  • SpringBoot Admin用法实例讲解

    说明 Spring Boot Admin 是一个管理和监控你的 Spring Boot 应用程序的应用程序. 这些应用程序通过 Spring Boot Admin Client(通过 HTTP)注册或者使用 Spring Cloud(例如 Eureka)发现. UI只是 Spring Boot Actuator 端点上的一个 AngularJs 应用程序. 创建服务 创建spring boot 项目,引入依赖 <dependency> <groupId>de.codecentric

  • Spring Boot Admin邮件警报整合过程解析

    一.前言 在Spring Boot Admin Server 中撒送预警邮件通知是很简单的,只需要简单的几个配置就可以了. 二.代码演示 1.microservice-monitor-server-> pom.xml <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi=&quo

  • Spring Boot Admin Server管理客户端过程详解

    要通过Spring Boot Admin Server监视和管理微服务应用程序,应该添加Spring Boot Admin启动器客户端依赖项,并将Admin Server URI指向应用程序属性文件. 注 - 要监视应用程序,应为微服务应用程序启用Spring Boot Actuator端点. 首先,在构建配置文件中添加以下Spring Boot Admin启动程序客户端依赖项和Spring Boot启动程序执行程序依赖项. Maven用户可以在pom.xml 文件中添加以下依赖项 - <dep

  • 使用Spirng Boot Admin监控Spring Cloud应用项目

    一. 介绍 GitHub: https://github.com/codecentric/spring-boot-admin 官方文档: http://codecentric.github.io/spring-boot-admin/1.5.7/ (此文档为1.5.7版本的文档) The applications register with our Spring Boot Admin Client (via HTTP) or are discovered using Spring Cloud ®

  • Spring Boot集群管理工具KafkaAdminClient使用方法解析

    原理介绍 在Kafka官网中这么描述AdminClient:The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects. 具体的KafkaAdminClient包含了一下几种功能(以Kafka1.0.0版本为准): 创建Topic:createTopics(Collection<NewTopic> newTopics) 删除Topic:deleteTopi

  • Spring Boot 集成接口管理工具 Knife4j

    目录 前言 集成过程 创建 Spring Boot 项目 添加依赖 配置添加 编写 Controller 层 启动测试 踩过的坑 空指针异常 请求路径未找到 总结 前言 之前介绍了如何在 Spring Boot 中集成 Swagger2 和 Swagger3,对于我们日常的接口管理已经够用了.但是作为一个颜值党,无论是 Swagger2 还是 Swagger3,都难以满足我们的审美.而且 Swagger2 和 Swagger3 都已经好久没更新了,更新还是比较慢的. 偶然之间发现了一个国产的接口

  • Docker Swarm集群管理的使用及原理解析

    Swarm 集群管理 简介 Docker Swarm 是 Docker 的集群管理工具.它将 Docker 主机池转变为单个虚拟 Docker 主机. Docker Swarm 提供了标准的 Docker API,所有任何已经与 Docker 守护程序通信的工具都可以使用 Swarm 轻松地扩展到多个主机. 支持的工具包括但不限于以下各项: Dokku Docker Compose Docker Machine 原理 如下图所示,swarm 集群由管理节点(manager)和工作节点(work

  • spring boot结合Redis实现工具类的方法示例

    自己整理了 spring boot 结合 Redis 的工具类 引入依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> 加入配置 # Redis数据库索引(默认为0) spring.redis.database=0 # Redis

  • 详解SpringBoot2 使用Spring Session集群

    有几种办法: 1.扩展指定server 利用Servlet容器提供的插件功能,自定义HttpSession的创建和管理策略,并通过配置的方式替换掉默认的策略.缺点:耦合Tomcat/Jetty等Servlet容器,不能随意更换容器. 2.利用Filter 利用HttpServletRequestWrapper,实现自己的 getSession()方法,接管创建和管理Session数据的工作.spring-session就是通过这样的思路实现的. 3 利用spring session Spring

  • Kubernetes(K8S)容器集群管理环境完整部署详细教程-上篇

    Kubernetes(通常称为"K8S")是Google开源的容器集群管理系统.其设计目标是在主机集群之间提供一个能够自动化部署.可拓展.应用容器可运营的平台.Kubernetes通常结合docker容器工具工作,并且整合多个运行着docker容器的主机集群,Kubernetes不仅仅支持Docker,还支持Rocket,这是另一种容器技术.Kubernetes是一个用于容器集群的自动化部署.扩容以及运维的开源平台. 本文系列: Kubernetes(K8S)容器集群管理环境完整部署详

  • Kubernetes(K8S)容器集群管理环境完整部署详细教程-下篇

    本文系列: Kubernetes(K8S)容器集群管理环境完整部署详细教程-上篇 Kubernetes(K8S)容器集群管理环境完整部署详细教程-中篇 Kubernetes(K8S)容器集群管理环境完整部署详细教程-下篇 在前一篇文章中详细介绍了Kubernetes(K8S)容器集群管理环境完整部署详细教程-中篇,这里继续记录下Kubernetes集群插件等部署过程: 十一.Kubernetes集群插件 插件是Kubernetes集群的附件组件,丰富和完善了集群的功能,这里分别介绍的插件有cor

  • Kubernetes(K8S)容器集群管理环境完整部署详细教程-中篇

    本文系列: Kubernetes(K8S)容器集群管理环境完整部署详细教程-上篇 Kubernetes(K8S)容器集群管理环境完整部署详细教程-中篇 Kubernetes(K8S)容器集群管理环境完整部署详细教程-下篇 接着Kubernetes(K8S)容器集群管理环境完整部署详细教程-上篇继续往下部署: 八.部署master节点 master节点的kube-apiserver.kube-scheduler 和 kube-controller-manager 均以多实例模式运行:kube-sc

  • Quartz+Spring Boot实现动态管理定时任务

    项目实践过程中碰到一个动态管理定时任务的需求:针对每个人员进行信息的定时更新,具体更新时间可随时调整.启动.暂定等. 思路 将每个人员信息的定时配置保存到数据库中,这样实现了任务的动态展示和管理.任务的每一次新增或变更,都会去数据库变更信息. 设置一个统一的任务管理器,专门负责动态任务的增删改查. POM依赖 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://mav

  • Java基于elasticsearch实现集群管理

    这篇文章主要介绍了java基于elasticsearch实现集群管理,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 本篇文章主要是查看集群中的相关信息,具体请看代码和注释 @Test public void test45() throws UnknownHostException{ //1.指定es集群 cluster.name 是固定的key值,my-application是ES集群的名称 Settings settings = Settin

随机推荐