SpringBoot如何获取Kafka的Topic列表

目录
  • 写在前面
  • 命令行模式
  • 代码模式
  • 总结

写在前面

众所周知,kafka是现代流行的消息队列,它使用经典的消息订阅发布模式实现消息的流转,大部分代码结合kafka使用都是使用它的生产者和消费者来实现消息的传递,那么对于kafka的主题的管理怎么使用代码实现呢,这是今天要讲的主题

命令行模式

kafka要结合zookeeper使用,因为它把元数据信息交给了zookeeper管理,其实使用命令行命令很容易就能对topic进行管理,主要使用的命令是kafka-topics.sh

创建主题
kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --replication-factor 3 --partitions 3
查看主题列表
kafka-topics.sh --zookeeper localhost:2181 --list
查看主题状态
kafka-topics.sh --describe  --zookeeper 127.0.0.1:2181 --topic TestTopic 

代码模式

那么话说回来如何使用代码实现topic的管理呢,那么现在就来看一下代码的实现方式,此处使用springboot2框架实现。

首先引进依赖kafka的相关

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

创建一个测试类进行测试

public static void main(String[] args) {
        Properties properties =  new Properties();
        properties.put("bootstrap.servers", "10.0.59.11:9093");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        AdminClient adminClient = AdminClient.create(properties);
        ListTopicsResult result = adminClient.listTopics();
        KafkaFuture<Set<String>> names = result.names();
        try {
            names.get().forEach((k)->{
                System.out.println(k);
            });
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        adminClient.close();
    }

这里面最主要的就是AdminClient这个类,AdminClient实现了Admin接口,Admin里面定义了许多和kafka配置相关的东西

让我们依次来看一下

public abstract class AdminClient implements Admin {
    public AdminClient() {
    }

    public static AdminClient create(Properties props) {
        return (AdminClient)Admin.create(props);
    }

    public static AdminClient create(Map<String, Object> conf) {
        return (AdminClient)Admin.create(conf);
    }
}

而Admin接口里有以下方法

static Admin create(Properties props) {
        return KafkaAdminClient.createInternal(new AdminClientConfig(props, true), (TimeoutProcessorFactory)null);
    }

    static Admin create(Map<String, Object> conf) {
        return KafkaAdminClient.createInternal(new AdminClientConfig(conf, true), (TimeoutProcessorFactory)null);
    }

    default void close() {
        this.close(9223372036854775807L, TimeUnit.MILLISECONDS);
    }

    /** @deprecated */
    @Deprecated
    default void close(long duration, TimeUnit unit) {
        this.close(Duration.ofMillis(unit.toMillis(duration)));
    }

    void close(Duration var1);

    default CreateTopicsResult createTopics(Collection<NewTopic> newTopics) {
        return this.createTopics(newTopics, new CreateTopicsOptions());
    }

    CreateTopicsResult createTopics(Collection<NewTopic> var1, CreateTopicsOptions var2);

    default DeleteTopicsResult deleteTopics(Collection<String> topics) {
        return this.deleteTopics(topics, new DeleteTopicsOptions());
    }

    DeleteTopicsResult deleteTopics(Collection<String> var1, DeleteTopicsOptions var2);

    default ListTopicsResult listTopics() {
        return this.listTopics(new ListTopicsOptions());
    }

    ListTopicsResult listTopics(ListTopicsOptions var1);

    default DescribeTopicsResult describeTopics(Collection<String> topicNames) {
        return this.describeTopics(topicNames, new DescribeTopicsOptions());
    }

    DescribeTopicsResult describeTopics(Collection<String> var1, DescribeTopicsOptions var2);

    default DescribeClusterResult describeCluster() {
        return this.describeCluster(new DescribeClusterOptions());
    }

    DescribeClusterResult describeCluster(DescribeClusterOptions var1);

    default DescribeAclsResult describeAcls(AclBindingFilter filter) {
        return this.describeAcls(filter, new DescribeAclsOptions());
    }

    DescribeAclsResult describeAcls(AclBindingFilter var1, DescribeAclsOptions var2);

通过名称我们可以看出,里面有创建Topic,有删除Topic,有列出所有Topic,有描述Topic

我们通过这些方法可以管理Kafka的Topic

最后我们来看一下实现效果

控制台打印里面有3个Topic

去服务器命令行验证一下

也是3个说明代码没问题

总结

网上大多数关于kafka的代码实现都是关于生产者和消费者的实现,今天主要是使用一下kakfa的配置管理类,实现对topic的管理,以此记录作为以后工作中的参考。希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • 深入研究spring boot集成kafka之spring-kafka底层原理

    目录 前言 简单集成 引入依赖 添加配置 测试发送和接收 Spring-kafka-test嵌入式KafkaServer 引入依赖 启动服务 创建新的Topic 程序启动时创建TOPIC 代码逻辑中创建 PS:其他的方式创建TOPIC 引入依赖 api方式创建 命令方式创建 消息发送之KafkaTemplate探秘 获取发送结果 异步获取 同步获取 KAFKA事务消息 REPLYINGKAFKATEMPLATE获得消息回复 Spring-kafka消息消费用法探秘 @KAFKALISTENER的

  • Springboot集成Kafka进行批量消费及踩坑点

    目录 引入依赖 创建配置类 Kafka 消费者 引入依赖 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.11.RELEASE</version> </dependency> 因为我的项目的 springboot 版本是 1.5.22.RELE

  • SpringBoot集成kafka全面实战记录

    本文是SpringBoot+Kafka的实战讲解,如果对kafka的架构原理还不了解的读者,建议先看一下<大白话kafka架构原理>.<秒懂kafka HA(高可用)>两篇文章. 一.生产者实践 普通生产者 带回调的生产者 自定义分区器 kafka事务提交 二.消费者实践 简单消费 指定topic.partition.offset消费 批量消费 监听异常处理器 消息过滤器 消息转发 定时启动/停止监听器 一.前戏 1.在项目中连接kafka,因为是外网,首先要开放kafka配置文件

  • 关于spring boot整合kafka+注解方式

    目录 spring boot自动配置方式整合 spring boot自动配置的不足 spring boot下手动配置kafka 批量消费消息 spring boot整合kafka报错 spring boot自动配置方式整合 spring boot具有许多自动化配置,对于kafka的自动化配置当然也包含在内,基于spring boot自动配置方式整合kafka,需要做以下步骤. 引入kafka的pom依赖包 <!-- https://mvnrepository.com/artifact/org.s

  • SpringBoot如何获取Kafka的Topic列表

    目录 写在前面 命令行模式 代码模式 总结 写在前面 众所周知,kafka是现代流行的消息队列,它使用经典的消息订阅发布模式实现消息的流转,大部分代码结合kafka使用都是使用它的生产者和消费者来实现消息的传递,那么对于kafka的主题的管理怎么使用代码实现呢,这是今天要讲的主题 命令行模式 kafka要结合zookeeper使用,因为它把元数据信息交给了zookeeper管理,其实使用命令行命令很容易就能对topic进行管理,主要使用的命令是kafka-topics.sh 创建主题 kafka

  • 实例详解Android 获取短信会话列表

    Android中的短信并没有正式的content provider可用,在官方文档中没有提供定义.不过依然可以自己定义好URI,然后查询出短信内容.例如conetent://sms则是所有短信所在的path. 要将短信按会话分类,原先我是查询出所有短信后,然后再按照thread_id分类.系统自带的短信程序包含一个会话显示界面,每个条目包含:联系人.短信数量.第一条短信等内容.当我的程序处理的短信较多时,一次查询出所有的短信就变得很慢.(如果再加上为每个会话查询联系人信息,则会更慢) 看了系统短

  • Python中如何获取类属性的列表

    前言 最近工作中遇到个需求是要得到一个类的静态属性,也就是说有个类 Type ,我要动态获取 Type.FTE 这个属性的值. 最简单的方案有两个: getattr(Type, 'FTE') Type.__dict__['FTE'] 那么,如果要获取类属性的列表,该怎么做呢? 首先上场的是 dir ,它能返回当前范围的所有属性名称列表: >>> dir() ['__builtins__', '__doc__', '__name__', '__package__'] >>>

  • java如何获取本地操作系统进程列表

    package com.wa.xwolf.sblog.util; import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.InputStreamReader; import java.nio.charset.Charset; public class ProcessUtil { public static void main(String[] args) throws Exception

  • C#操作DataTable方法实现过滤、取前N条数据及获取指定列数据列表的方法

    本文实例讲述了C#操作DataTable方法实现过滤.取前N条数据及获取指定列数据列表的方法.分享给大家供大家参考.具体分析如下: #region DataTable筛选,排序返回符合条件行组成的新DataTable或直接用DefaultView按条件返回 /// <summary> /// DataTable筛选,排序返回符合条件行组成的新DataTable ///或直接用DefaultView按条件返回 /// eg:SortExprDataTable(dt,"Sex='男'&q

  • springboot如何获取相对路径文件夹下静态资源的方法

    今日遇到一个问题:springboot需要获取到一个自定义名称文件夹下的静态资源(图片等),并且文件夹的路径不在classPath下面,而是一个相对路径. 一开始使用修改配置文件的方式: # 配置静态资源访问前缀 spring.mvc.static-path-pattern=*/** # 配置静态资源路径,默认配置失效 spring.resources.static-locations=../upload 发现并不行,无法解析出相对路径. 后面通过自定义静态资源映射配置类实现了: @Config

  • Python获取时间范围内日期列表和周列表的函数

    Python获取时间范围内日期列表和周列表的函数 1.获取日期列表 # -*- coding=utf-8 -*- import datetime def dateRange(beginDate, endDate): dates = [] dt = datetime.datetime.strptime(beginDate, "%Y-%m-%d") date = beginDate[:] while date <= endDate: dates.append(date) dt = d

  • Python实现使用dir获取类的方法列表

    使用Python的内置方法dir,可以范围一个模块中定义的名字的列表. 官方解释是: Docstring: dir([object]) -> list of strings If called without an argument, return the names in the current scope. Else, return an alphabetized list of names comprising (some of) the attributes of the given o

  • Springboot @Value获取值为空问题解决方案

    这篇文章主要介绍了Springboot @Value获取值为空问题解决方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 在spring中,常常使用 @Value("${property}") 从application.properties中取值,需要注意两点 使用 @Value 的类不能使用 new 关键字进行实例化对象,必须采用 依赖注入的方式进行实例化 不能使用显式的构造方法 否则,将取不到值.解决方法如下: 删除显式的构造方法

  • 微信小程序获取公众号文章列表及显示文章的示例代码

    微信小程序中如何打开公众号中的文章,步骤相对来说不麻烦. 1.公众号设置 小程序若要获取公众号的素材,公众号需要做一些设置. 1.1 绑定小程序 公众号需要绑定目标小程序,否则无法打开公众号的文章. 在公众号管理界面,点击小程序管理 --> 关联小程序 输入小程序的AppID搜索,绑定即可. 1.2 公众号开发者功能配置 (1) 在公众号管理界面,点击开发模块中的基本配置选项. (2) 开启开发者秘密(AppSecret),注意保存改秘密. (3) 设置ip白名单,这个就是发起请求的机器的外网i

随机推荐