Kafka producer端开发代码实例

一、producer工作流程

  producer使用用户启动producer的线程,将待发送的消息封装到一个ProducerRecord类实例,然后将其序列化之后发送给partitioner,再由后者确定目标分区后一同发送到位于producer程序中的一块内存缓冲区中。而producer的另外一个线程(Sender线程)则负责实时从该缓冲区中提取出准备就绪的消息封装进一个批次(batch),统一发送给对应的broker,具体流程如下图:

二、producer示例程序开发

  首先引入kafka相关依赖,在pom.xml文件中加入如下依赖:

<!--kafka-->
  <dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka_2.12</artifactId>
   <version>2.2.0</version>
  </dependency>

  在resources下面创建kafka-producer.properties配置文件,用于设置kafka参数,内容如下:

bootstrap.servers=192.168.184.128:9092,192.168.184.128:9093,192.168.184.128:9094
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
acks=-1
retries=3
batch.size=323840
linger.ms=10
buffer.memory=33554432
max.block.ms=3000

  其中,前三个参数必须明确指定,因为这三个参数没有默认值(注:kafka的producer参数配置可以参考http://kafka.apache.org/documentation/),然后编写producer发送消息的代码:

/**
   * Kafka发送消息测试
   * @throws IOException
   */
  public void sendMsg() throws IOException {
    //1.构造properties对象
    Properties properties = new Properties();
    FileInputStream fileInputStream = new FileInputStream("F:\\javaCode\\jvmdemo\\src\\main\\resources\\kafka-producer.properties");
    properties.load(fileInputStream);
    fileInputStream.close();
    //2.构造kafkaProducer对象
    KafkaProducer producer = new KafkaProducer(properties);
    for (int i = 0; i < 100; i++) {
      //3.构造待发送消息的producerRecord对象,并指定消息要发送到哪个topic,消息的key和value
      ProducerRecord testTopic = new ProducerRecord("testTopic", Integer.toString(i), Integer.toString(i));
      //4.调用kafkaProducer对象的send方法发送消息
      producer.send(testTopic);
    }
    //5.关闭kafkaProducer
    producer.close();
  }

  然后登陆kafka所在服务器,执行以下命令监听消息: 

cd /usr/local/kafka/bin
./kafka-console-consumer.sh --bootstrap-server 192.168.184.128:9092,192.168.184.128:9093,192.168.184.128:9094 --topic testTopic --from-beginning

  运行sendMsg方法,注意观察消费端,

  

  可以看到有0-99之间的数字依次被消费到,说明消息发送成功。

三、异步和同步发送消息

  上面发送消息的示例程序中,没有对发送结果进行处理,如果消息发送失败我们也是无法得知的,这种方法在实际应用中是不推荐的。在实际使用场景中,一般使用异步和同步两种常见发送方式。Java版本producer的send方法会返回一个Future对象,如果调用Future.get()方法就会无限等待返回结果,实现同步发送的效果,否则就是异步发送。

  1.异步发送消息

  Java版本producer的send()方法提供了回调类参数来实现异步发送以及对发送结果进行的响应,具体代码如下:

/**
   * 异步发送消息
   *
   * @throws IOException
   */
  public void sendMsg() throws IOException {
    //1.构造properties对象
    Properties properties = new Properties();
    FileInputStream fileInputStream = new FileInputStream("F:\\javaCode\\jvmdemo\\src\\main\\resources\\kafka-producer.properties");
    properties.load(fileInputStream);
    fileInputStream.close();
    //2.构造kafkaProducer对象
    KafkaProducer producer = new KafkaProducer(properties);
    for (int i = 0; i < 100; i++) {
      //3.构造待发送消息的producerRecord对象,并指定消息要发送到哪个topic,消息的key和value
      ProducerRecord testTopic = new ProducerRecord("testTopic", Integer.toString(i), Integer.toString(i));
      //4.调用kafkaProducer对象的send方法发送消息,传入Callback回调参数
      producer.send(testTopic, new Callback() {
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
          if (null == exception) {
            //消息发送成功后的处理
            System.out.println("消息发送成功");
          } else {
            //消息发送失败后的处理
            System.out.println("消息发送失败");
          }
        }
      });
    }
    //5.关闭kafkaProducer
    producer.close();
  }

  以上代码中,send方法第二个参数传入一个匿名内部类对象,也可以传入实现了org.apache.kafka.clients.producer.Callback接口的类对象。同时onCompletion方法的两个入参recordMetadata和exception不会同时为空,当消息发送成功后,exception为null,消息发送失败后recordMetadata为null。因此可以按照两个入参进行成功和失败逻辑的处理。

  其次,Kafka发送消息失败的类型包含两类,可重试异常和不可重试异常。所有的可重试异常都继承自org.apache.kafka.common.errors.RetriableException抽象类,理论上所有没有继承RetriableException 类的其他异常都属于不可重试异常,鉴于此,可以在消息发送失败后,按照是否可以重试,来进行不同的处理逻辑处理:

//4.调用kafkaProducer对象的send方法发送消息
producer.send(testTopic, new Callback() {
  @Override
  public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
    if (null == exception) {
      //消息发送成功后的处理
      System.out.println("消息发送成功");
    } else {
      if(exception instanceof RetriableException){
        // 可重试异常
        System.out.println("可重试异常");
      }else{
        // 不可重试异常
        System.out.println("不可重试异常");
      }
    }
  }
});

  2.同步发送消息

  同步发送和异步发送是通过Java的Futrue来区分的,调用Future.get()无限等待结果返回,即实现了同步发送的结果,具体代码如下:

// 发送消息
 Future future = producer.send(testTopic);
 try {
   // 调用get方法等待结果返回,发送失败则会抛出异常
   future.get();
 } catch (Exception e) {
   System.out.println("消息发送失败");
 }

四、其他高级特性

1.消息分区机制

  kafka producer提供了分区策略以及分区器(partitioner)用于确定将消息发送到指定topic的哪个分区中。默认分区器根据murmur2算法计算消息key的哈希值,然后对总分区数求模确认消息要被发送的目标分区号(这点让我想起了redis集群中key值的分配方法),这样就确保了相同key的消息被发送到相同分区。若消息没有key值,将采用轮询的方式确保消息在topic的所有分区上均匀分配。

  除了使用kafka默认的分区机制,也可以通过实现org.apache.kafka.clients.producer.Partitioner接口来自定义分区器,此时需要在构造KafkaProducer的 properties中增加partitioner.class来指明分区器实现类,如:partitioner.class=com.demo.service.CustomerPartitioner。

2.消息序列化

  在本篇开始的producer示例程序中,在构造KafkaProducer对象的时候,有两个配置项

  • key.serializer=org.apache.kafka.common.serialization.StringSerializer
  • value.serializer=org.apache.kafka.common.serialization.StringSerializer

分别用于配置消息key和value的序列化方式为String类型,除此之外,Kafka中还提供了如下默认的序列化器:

  ByteArraySerializer:本质上什么也不做,因为网络中传输就是以字节传输的;

  ByteBufferSerializer:序列化ByteBuffer消息;

  BytesSerializer:序列化kafka自定义的Bytes类型;

  IntegerSerializer:序列化Integer类型;

  DoubleSerializer:序列化Double类型;

  LongSerializer:序列化Long类型;

  如果要自定义序列化器,则需要实现org.apache.kafka.common.serialization.Serializer接口,并且将key.serializer和value.serializer配置为自定义的序列化器。

3.消息压缩

  消息压缩可以显著降低磁盘占用以及带宽占用,从而有效提升I/O密集型应用性能,但是引入压缩同时会消耗额外的CPU,因此压缩是I/O性能和CPU资源的平衡。kafka目前支持3种压缩算法:CZIP,Snappy和LZ4,性能测试结果显示三种压缩算法的性能如下:LZ4>>Snappy>GZIP,目前启用LZ4进行消息压缩的producer的吞吐量是最高的。

  默认情况下Kafka是不压缩消息的,但是可以通过在创建KafkaProducer 对象的时候设置producer端参数compression.type来开启消息压缩,如配置compression.type=LZ4。那么什么时候开启压缩呢?首先判断是否启用压缩的依据是I/O资源消耗与CPU资源消耗的对比,如果环境上I/O资源非常紧张,比如producer程序占用了大量的网络带宽或broker端的磁盘占用率很高,而producer端的CPU资源非常富裕,那么就可以考虑为producer开启压缩。

4.无消息丢失配置

  在使用KafkaProducer.send()方法发送消息的时候,其实是把消息放入缓冲区中,再由一个专属I/O线程负责从缓冲区提取消息并封装消息到batch中,然后再发送出去。如果在I/O线程将消息发送出去之前,producer奔溃了,那么所有的消息都将丢失。同时,存在多消息发送时候由于网络抖动导致消息乱序的问题,为了解决这两个问题,可以通过在producer端以及broker端进行配置进行避免。

4.1 producer端配置

  max.block.ms=3000:设置block的时长,当缓冲区被填满或者metadata丢失时产生block,停止接收新的消息;

  acks=all:等待所有follower都响应了发送消息认为消息发送成功;

  retries=Integer.MAX_VALUE:设置重试次数,设置一个比较大的值可以保证消息不丢失;

  max.in.flight.requests.per.connection=1:限制producer在单个broker连接上能够发送的未响应请求的数量,从而防止同topic统一分区下消息乱序问题;

  除了设置以上参数之外,在发送消息的时候,应该尽量使用带有回调参数的send方法来处理发送结果,如果数据发送失败,则显示调用KafkaProducer.close(0)方法来立即关闭producer,防止消息乱序。

4.2 broker端配置

  unclean.leader.election.enable=false:关闭unclean leader选举,即不允许非ISR中的副本被选举为leader;

  replication.factor>=3:至少使用3个副本保存数据;

  min.issync.replicas>1:控制某条消息至少被写入到ISR中多少个副本才算成功,当且仅当producer端acks参数设置为all或者-1时,该参数才有效。

  最后,确保replication.factor>min.issync.replicas,如果两者相等,那么只要有一个副本挂掉,分区就无法工作,推荐配置replication.factor=min.issync.replicas+1。

  关于producer端的开发就介绍到这儿,下一篇将介绍consumer端的开发。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • docker-compose部署zk+kafka+storm集群的实现

    集群部署总览 172.22.12.20 172.22.12.21 172.22.12.22 172.22.12.23 172.22.12.24 zoo1:2181 zoo2:2182 zoo3:2183 zkui:9090 (admin/manager) kafka1:9092 kafka2:9092 kafka3:9092 kafdrop:9000 influxdb:8086 grafana:3000 (admin/chanhu) storm-nimbus1 storm-nimbus2 sto

  • Kafka Java Producer代码实例详解

    根据业务需要可以使用Kafka提供的Java Producer API进行产生数据,并将产生的数据发送到Kafka对应Topic的对应分区中,入口类为:Producer Kafka的Producer API主要提供下列三个方法: public void send(KeyedMessage<K,V> message) 发送单条数据到Kafka集群 public void send(List<KeyedMessage<K,V>> messages) 发送多条数据(数据集)到

  • Springboot集成Kafka实现producer和consumer的示例代码

    本文介绍如何在springboot项目中集成kafka收发message. Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性: 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能.高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息.支持通过Kafka服务器和消费机集群来分区消息.支持Hadoop并行数据加载. 安装Kafka 因为安装kafka需要zookeeper的支持,所以Windows安装时需要将zookee

  • Springboot 1.5.7整合Kafka-client代码示例

    在一次项目中,因甲方需要使用kafka消息队列推送数据,所以需要接入kafka,并且kafka的版本是2.11.但是我们项目使用的是Springboot 1.5.7的版本,对应的springboot.kafka.starter有冲突,所以就接入了kafka-client. Kafka 是一个分布式消息引擎与流处理平台,经常用做企业的消息总线.实时数据管道,有的还把它当做存储系统来使用. 早期 Kafka 的定位是一个高吞吐的分布式消息系统,目前则演变成了一个成熟的分布式消息引擎,以及流处理平台.

  • Kafka多节点分布式集群搭建实现过程详解

    上一篇分享了单节点伪分布式集群搭建方法,本篇来分享一下多节点分布式集群搭建方法.多节点分布式集群结构如下图所示: 为了方便查阅,本篇将和上一篇一样从零开始一步一步进行集群搭建. 一.安装Jdk 具体安装步骤可参考linux安装jdk. 二.安装与配置zookeeper 下载地址:https://www-us.apache.org/dist/zookeeper/stable/ 下载二进制压缩包zookeeper-3.4.14.tar.gz,然后上传到linux服务器指定目录下,本次上传目录为/so

  • Python confluent kafka客户端配置kerberos认证流程详解

    kafka的认证方式一般有如下3种: 1.SASL/GSSAPI 从版本0.9.0.0开始支持 2.SASL/PLAIN 从版本0.10.0.0开始支持 3.SASL/SCRAM-SHA-256 以及 SASL/SCRAM-SHA-512 从版本0.10.2.0开始支持 其中第一种SASL/GSSAPI的认证就是kerberos认证,对于java来说有原生的支持,但是对于python来说配置稍微麻烦一些,下面说一下具体的配置过程,confluent kafka模块底层依赖于librdkafka,

  • Kafka单节点伪分布式集群搭建实现过程详解

    Kafka集群搭建分为单节点的伪分布式集群和多节点的分布式集群两种,首先来看一下单节点伪分布式集群安装.单节点伪分布式集群是指集群由一台ZooKeeper服务器和一台Kafka broker服务器组成,如下图所示: 为了搭建单节点Kafka集群,需要依次安装如下软件:安装Java-->安装ZooKeeper-->安装Kafka. 一.安装Java 可以参考linux安装jdk,来进行安装 二.安装ZooKeeper 下载地址:https://www-us.apache.org/dist/zoo

  • Spring boot 整合KAFKA消息队列的示例

    这里使用 spring-kafka 依赖和 KafkaTemplate 对象来操作 Kafka 服务. 一.添加依赖和添加配置项 1.1.在 Pom 文件中添加依赖 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> 1.2.添加配置项 spring: kafka: b

  • Kafka producer端开发代码实例

    一.producer工作流程 producer使用用户启动producer的线程,将待发送的消息封装到一个ProducerRecord类实例,然后将其序列化之后发送给partitioner,再由后者确定目标分区后一同发送到位于producer程序中的一块内存缓冲区中.而producer的另外一个线程(Sender线程)则负责实时从该缓冲区中提取出准备就绪的消息封装进一个批次(batch),统一发送给对应的broker,具体流程如下图: 二.producer示例程序开发 首先引入kafka相关依赖

  • Java实现TCP/IP协议的收发数据(服务端)代码实例

    这篇文章主要介绍了Java实现TCP/IP协议的收发数据(服务端)代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 功能如下: 注: 只有服务端,没有客户端,测试时采用第三方软件作为客户端的. 收发数据目前能正常收发数据,只是中文的会变成乱码显示. 采用Thread类实现一个收发数据的线程. 服务端代码: import java.io.IOException; import java.io.InputStream; import java

  • Spring整合Quartz开发代码实例

    我们使用Spring整合Quartz开发,本实例采用数据库模式的demo. xml文件配置如下: <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns

  • 基于Electron实现桌面应用开发代码实例

    Electron是一个可以使用 JavaScript,HTML 和 CSS 构建跨平台桌面应用程序的开源框架. 本文主要分享一下采用vue + electron开发桌面程序的搭建过程. 1. 环境准备 这里采用的是vue-cli3.x,可以通过下面的指令查看当前vue-cli的版本: vue --version# 3.9.3 这里我用的是3.9.3 如果没有装vue-cli可以通过下面的命令安装: npm install -g @vue/cli 如果是vue-cli还是2.x可以先卸载2.x然后

  • Spring纯Java配置集成kafka代码实例

    这篇文章主要介绍了Spring纯Java配置集成kafka代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 KafkaConfig.java package com.niugang.config; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache

  • vue+mockjs模拟数据实现前后端分离开发的实例代码

    本文介绍了vue+mockjs模拟数据实现前后端分离开发的实例代码,分享给大家,也给自己留个笔记. 在项目中尝试了mockjs,mock数据,实现前后端分离开发. 关于mockjs,官网描述的是 1.前后端分离 2.不需要修改既有代码,就可以拦截 Ajax 请求,返回模拟的响应数据. 3.数据类型丰富 4.通过随机数据,模拟各种场景. 等等优点. 总结起来就是在后端接口没有开发完成之前,前端可以用已有的接口文档,在真实的请求上拦截ajax,并根据mockjs的mock数据的规则,模拟真实接口返回

  • vue实现PC端录音功能的实例代码

    录音功能一般来说在移动端比较常见,但是在pc端也要实现按住说话的功能呢?项目需求:按住说话,时长不超过60秒,生成语音文件并上传,我这里用的是recorder.js 1.项目中新建一个recorder.js文件,内容如下,也可在百度上直接搜一个 // 兼容 window.URL = window.URL || window.webkitURL navigator.getUserMedia = navigator.getUserMedia || navigator.webkitGetUserMed

  • asp.net 仿微信端菜单设置实例代码详解

    第一步:添加引用文件 <link rel="stylesheet" href="~/assets/css/bootstrap.min.css" rel="external nofollow" > <link rel="stylesheet" href="~/assets/css/font-awesome.min.css" rel="external nofollow"

  • Python TCP通信客户端服务端代码实例

    这篇文章主要介绍了Python TCP通信客户端服务端代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 TCP客户端: import argparse, socket,sys import time # socket.setdefaulttimeout(20) def recvall(sock, length): data = b'' while len(data) < length: more = sock.recv(length -

随机推荐