Java API方式调用Kafka各种协议的方法

众所周知,Kafka自己实现了一套二进制协议(binary protocol)用于各种功能的实现,比如发送消息,获取消息,提交位移以及创建topic等。具体协议规范参见:Kafka协议  这套协议的具体使用流程为:

1.客户端创建对应协议的请求

2.客户端发送请求给对应的broker

3.broker处理请求,并发送response给客户端

虽然Kafka提供的大量的脚本工具用于各种功能的实现,但很多时候我们还是希望可以把某些功能以编程的方式嵌入到另一个系统中。这时使用Java API的方式就显得异常地灵活了。本文我将尝试给出Java API底层框架的一个范例,同时也会针对“创建topic”和“查看位移”这两个主要功能给出对应的例子。 需要提前说明的是,本文给出的范例并没有考虑Kafka集群开启安全的情况。另外Kafka的KIP4应该一直在优化命令行工具以及各种管理操作,有兴趣的读者可以关注这个KIP。

本文中用到的API依赖于kafka-clients,所以如果你使用Maven构建的话,请加上:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>0.10.2.0</version>
</dependency>

如果是gradle,请加上:

compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.10.2.0'

底层框架

/**
   * 发送请求主方法
   * @param host     目标broker的主机名
   * @param port     目标broker的端口
   * @param request    请求对象
   * @param apiKey    请求类型
   * @return       序列化后的response
   * @throws IOException
   */
  public ByteBuffer send(String host, int port, AbstractRequest request, ApiKeys apiKey) throws IOException {
    Socket socket = connect(host, port);
    try {
      return send(request, apiKey, socket);
    } finally {
      socket.close();
    }
  }

  /**
   * 发送序列化请求并等待response返回
   * @param socket      连向目标broker的socket
   * @param request      序列化后的请求
   * @return         序列化后的response
   * @throws IOException
   */
  private byte[] issueRequestAndWaitForResponse(Socket socket, byte[] request) throws IOException {
    sendRequest(socket, request);
    return getResponse(socket);
  }

  /**
   * 发送序列化请求给socket
   * @param socket      连向目标broker的socket
   * @param request      序列化后的请求
   * @throws IOException
   */
  private void sendRequest(Socket socket, byte[] request) throws IOException {
    DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
    dos.writeInt(request.length);
    dos.write(request);
    dos.flush();
  }

  /**
   * 从给定socket处获取response
   * @param socket      连向目标broker的socket
   * @return         获取到的序列化后的response
   * @throws IOException
   */
  private byte[] getResponse(Socket socket) throws IOException {
    DataInputStream dis = null;
    try {
      dis = new DataInputStream(socket.getInputStream());
      byte[] response = new byte[dis.readInt()];
      dis.readFully(response);
      return response;
    } finally {
      if (dis != null) {
        dis.close();
      }
    }
  }

  /**
   * 创建Socket连接
   * @param hostName     目标broker主机名
   * @param port       目标broker服务端口, 比如9092
   * @return         创建的Socket连接
   * @throws IOException
   */
  private Socket connect(String hostName, int port) throws IOException {
    return new Socket(hostName, port);
  }

  /**
   * 向给定socket发送请求
   * @param request    请求对象
   * @param apiKey    请求类型, 即属于哪种请求
   * @param socket    连向目标broker的socket
   * @return       序列化后的response
   * @throws IOException
   */
  private ByteBuffer send(AbstractRequest request, ApiKeys apiKey, Socket socket) throws IOException {
    RequestHeader header = new RequestHeader(apiKey.id, request.version(), "client-id", 0);
    ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + request.sizeOf());
    header.writeTo(buffer);
    request.writeTo(buffer);
    byte[] serializedRequest = buffer.array();
    byte[] response = issueRequestAndWaitForResponse(socket, serializedRequest);
    ByteBuffer responseBuffer = ByteBuffer.wrap(response);
    ResponseHeader.parse(responseBuffer);
    return responseBuffer;
  }

有了这些方法的铺垫,我们就可以创建具体的请求了。

创建topic

/**
   * 创建topic
   * 由于只是样例代码,有些东西就硬编码写到程序里面了(比如主机名和端口),各位看官自行修改即可
   * @param topicName       topic名
   * @param partitions      分区数
   * @param replicationFactor   副本数
   * @throws IOException
   */
  public void createTopics(String topicName, int partitions, short replicationFactor) throws IOException {
    Map<String, CreateTopicsRequest.TopicDetails> topics = new HashMap<>();
    // 插入多个元素便可同时创建多个topic
    topics.put(topicName, new CreateTopicsRequest.TopicDetails(partitions, replicationFactor));
    int creationTimeoutMs = 60000;
    CreateTopicsRequest request = new CreateTopicsRequest.Builder(topics, creationTimeoutMs).build();
    ByteBuffer response = send("localhost", 9092, request, ApiKeys.CREATE_TOPICS);
    CreateTopicsResponse.parse(response, request.version());
  }

查看位移

/**
   * 获取某个consumer group下的某个topic分区的位移
   * @param groupID      group id
   * @param topic       topic名
   * @param parititon     分区号
   * @throws IOException
   */
  public void getOffsetForPartition(String groupID, String topic, int parititon) throws IOException {
    TopicPartition tp = new TopicPartition(topic, parititon);
    OffsetFetchRequest request = new OffsetFetchRequest.Builder(groupID, singletonList(tp))
        .setVersion((short)2).build();
    ByteBuffer response = send("localhost", 9092, request, ApiKeys.OFFSET_FETCH);
    OffsetFetchResponse resp = OffsetFetchResponse.parse(response, request.version());
    OffsetFetchResponse.PartitionData partitionData = resp.responseData().get(tp);
    System.out.println(partitionData.offset);
  }
/**
   * 获取某个consumer group下所有topic分区的位移信息
   * @param groupID      group id
   * @return         (topic分区 --> 分区信息)的map
   * @throws IOException
   */
  public Map<TopicPartition, OffsetFetchResponse.PartitionData> getAllOffsetsForGroup(String groupID) throws IOException {
    OffsetFetchRequest request = new OffsetFetchRequest.Builder(groupID, null).setVersion((short)2).build();
    ByteBuffer response = send("localhost", 9092, request, ApiKeys.OFFSET_FETCH);
    OffsetFetchResponse resp = OffsetFetchResponse.parse(response, request.version());
    return resp.responseData();
  }

okay, 上面就是“创建topic”和“查看位移”的样例代码,各位看官可以参考着这两个例子构建其他类型的请求。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Kafka使用Java客户端进行访问的示例代码

    本文环境如下: 操作系统:CentOS 6 32位 JDK版本:1.8.0_77 32位 Kafka版本:0.9.0.1(Scala 2.11) 1. maven依赖包 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> </dependen

  • Java API方式调用Kafka各种协议的方法

    众所周知,Kafka自己实现了一套二进制协议(binary protocol)用于各种功能的实现,比如发送消息,获取消息,提交位移以及创建topic等.具体协议规范参见:Kafka协议  这套协议的具体使用流程为: 1.客户端创建对应协议的请求 2.客户端发送请求给对应的broker 3.broker处理请求,并发送response给客户端 虽然Kafka提供的大量的脚本工具用于各种功能的实现,但很多时候我们还是希望可以把某些功能以编程的方式嵌入到另一个系统中.这时使用Java API的方式就显

  • 使用Android studio3.6的java api方式调用opencv

    基本环境: Android studio3.6 NDK:r15c(尽量使用该版本) Opencv3.4.1 android sdk 操作: (1)新建工程,选择Empty Activity,工程名为op (2)File->New->Import Module,然后选择自己的java-opencv的相对应路径,比如,D:\Android\OpenCV-android-sdk\sdk\java (3)修改openCVlibrary341下面的build.gradle中的,compileSdkVer

  • Java基于socket服务实现UDP协议的方法

    本文实例讲述了Java基于socket服务实现UDP协议的方法.分享给大家供大家参考.具体如下: 示例1: 接收类: package com.socket.demo; import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; public class UDPReceiveDemo { public static void main(String[] args) throw

  • java通过jni调用opencv处理图像的方法

    1. 建立java文件 public class getImageFeature { static{ System.loadLibrary("getImageFeatureDll"); } public native int getImageFeatureByName(String filename); public native int getImageFeatureByMemory(); public static void main(String[] args) { getIma

  • java使用jna调用c#中dll的方法详解

    前言 JNA(Java Native Access )提供一组Java工具类用于在运行期动态访问系统本地库(native library:如Window的dll)而不需要编写任何Native/JNI代码.开发人员只要在一个java接口中描述目标native library的函数与结构,JNA将自动实现Java接口到native function的映射. 优点 JNA可以让你像调用一般java方法一样直接调用本地方法.就和直接执行本地方法差不多,而且调用本地方法还不用额外的其他处理或者配置什么的,

  • hbase访问方式之java api

    Hbase的访问方式 1.Native Java API:最常规和高效的访问方式: 2.HBase Shell:HBase的命令行工具,最简单的接口,适合HBase管理使用: 3.Thrift Gateway:利用Thrift序列化技术,支持C++,PHP,Python等多种语言,适合其他异构系统在线访问HBase表数据: 4.REST Gateway:支持REST 风格的Http API访问HBase, 解除了语言限制: 5.MapReduce:直接使用MapReduce作业处理Hbase数据

  • HDFS的Java API的访问方式实例代码

    本文研究的主要是HDFS的Java API的访问方式,具体代码如下所示,有详细注释. 最近的节奏有点儿快,等有空的时候把这个封装一下 实现代码 要导入的包: import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation

  • Java利用httpclient通过get、post方式调用https接口的方法

    通过httpclient的get post方式调用http很常见.一般都是 HttpClient client = new DefaultHttpClient(); HttpPost post = new HttpPost(http://127.0.0.1/login); 但是如果要调用https这个方式就不行了.就要修改DefaultHttpClient <dependency> <groupId>org.apache.httpcomponents</groupId>

  • Java 使用反射调用jar包中的类方式

    下面讲展示的是从image.jar包中调用image.Buddy类的方法. public class Main { public static void main(String[] args) { try { // 两种方式都可以 URL url = new File("/Users/wuchen/Documents/IntelliJIDEA/Applet/out/production/Applet/image.jar").toURI().toURL(); // URL url = n

  • ChatGPT介绍及Java API调用

    ChatGPT的基本介绍 ChatGPT是一个用来进行自然语言处理任务的预训练模型.要使用ChatGPT,需要了解以下几点: 理解预训练模型:预训练模型是通过在大量数据上训练得到的模型,可以在各种自然语言处理任务上进行迁移学习. API使用:OpenAI提供了一个API,可以通过发送请求来生成回答. 请求格式:请求必须包含模型的提问,并且可以包含上下文信息. 响应格式:响应将包含模型的回答,以及一些其他的信息,例如回答的概率等. 如果您需要进一步的指导,请查看OpenAI的API文档和代码示例.

随机推荐