Java kafka如何实现自定义分区类和拦截器

生产者发送到对应的分区有以下几种方式:

(1)指定了patition,则直接使用;(可以查阅对应的java api, 有多种参数)

(2)未指定patition但指定key,通过对key的value进行hash出一个patition;

(3)patition和key都未指定,使用轮询选出一个patition。

但是kafka提供了,自定义分区算法的功能,由业务手动实现分布:

1、实现一个自定义分区类,CustomPartitioner实现Partitioner

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class CustomPartitioner implements Partitioner {

  /**
   *
   * @param topic 当前的发送的topic
   * @param key  当前的key值
   * @param keyBytes 当前的key的字节数组
   * @param value 当前的value值
   * @param valueBytes 当前的value的字节数组
   * @param cluster
   * @return
   */
  @Override
  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    //这边根据返回值就是分区号, 这边就是固定发送到三号分区
    return 3;
  }

  @Override
  public void close() {

  }
  @Override
  public void configure(Map<String, ?> configs) {

  }

}

2、producer配置文件指定,具体的分区类

// 具体的分区类
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "kafka.CustomPartitioner");

技巧:可以使用ProducerConfig中提供的配置ProducerConfig

kafka producer拦截器

拦截器(interceptor)是在Kafka 0.10版本被引入的。

interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。

许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。

所使用的类为:

org.apache.kafka.clients.producer.ProducerInterceptor

我们可以编码测试下:

1、定义消息拦截器,实现消息处理(可以是加时间戳等等,unid等等。)

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;
import java.util.UUID;

public class MessageInterceptor implements ProducerInterceptor<String, String> {

  @Override
  public void configure(Map<String, ?> configs) {
    System.out.println("这是MessageInterceptor的configure方法");
  }

  /**
   * 这个是消息发送之前进行处理
   *
   * @param record
   * @return
   */
  @Override
  public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
    // 创建一个新的record,把uuid入消息体的最前部
    System.out.println("为消息添加uuid");
    return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
        UUID.randomUUID().toString().replace("-", "") + "," + record.value());
  }

  /**
   * 这个是生产者回调函数调用之前处理
   * @param metadata
   * @param exception
   */
  @Override
  public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    System.out.println("MessageInterceptor拦截器的onAcknowledgement方法");
  }

  @Override
  public void close() {
    System.out.println("MessageInterceptor close 方法");
  }
}

2、定义计数拦截器

import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class CounterInterceptor implements ProducerInterceptor<String, String>{
  private int errorCounter = 0;
  private int successCounter = 0;

  @Override
  public void configure(Map<String, ?> configs) {
    System.out.println("这是CounterInterceptor的configure方法");
  }

  @Override
  public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
    System.out.println("CounterInterceptor计数过滤器不对消息做任何操作");
    return record;
  }

  @Override
  public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    // 统计成功和失败的次数
    System.out.println("CounterInterceptor过滤器执行统计失败和成功数量");
    if (exception == null) {
      successCounter++;
    } else {
      errorCounter++;
    }
  }

  @Override
  public void close() {
    // 保存结果
    System.out.println("Successful sent: " + successCounter);
    System.out.println("Failed sent: " + errorCounter);
  }
}

3、producer客户端:

import org.apache.kafka.clients.producer.*;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class Producer1 {
  public static void main(String[] args) throws Exception {
    Properties props = new Properties();
    // Kafka服务端的主机名和端口号
    props.put("bootstrap.servers", "localhost:9092");
    // 等待所有副本节点的应答
    props.put("acks", "all");
    // 消息发送最大尝试次数
    props.put("retries", 0);
    // 一批消息处理大小
    props.put("batch.size", 16384);
    // 请求延时,可能生产数据太快了
    props.put("linger.ms", 1);
    // 发送缓存区内存大小,数据是先放到生产者的缓冲区
    props.put("buffer.memory", 33554432);
    // key序列化
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    // value序列化
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    // 具体的分区类
    props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "kafka.CustomPartitioner");
    //定义拦截器
    List<String> interceptors = new ArrayList<>();
    interceptors.add("kafka.MessageInterceptor");
    interceptors.add("kafka.CounterInterceptor");
    props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

    Producer<String, String> producer = new KafkaProducer<>(props);
    for (int i = 0; i < 1; i++) {
      producer.send(new ProducerRecord<String, String>("test_0515", i + "", "xxx-" + i), new Callback() {
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
          System.out.println("这是producer回调函数");
        }
      });
    }
    /*System.out.println("现在执行关闭producer");
    producer.close();*/
    producer.close();
  }
}

总结,我们可以知道拦截器链各个方法的执行顺序,假如有A、B拦截器,在一个拦截器链中:

(1)执行A的configure方法,执行B的configure方法

(2)执行A的onSend方法,B的onSend方法

(3)生产者发送完毕后,执行A的onAcknowledgement方法,B的onAcknowledgement方法。

(4)执行producer自身的callback回调函数。

(5)执行A的close方法,B的close方法。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Spring Boot集群管理工具KafkaAdminClient使用方法解析

    原理介绍 在Kafka官网中这么描述AdminClient:The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects. 具体的KafkaAdminClient包含了一下几种功能(以Kafka1.0.0版本为准): 创建Topic:createTopics(Collection<NewTopic> newTopics) 删除Topic:deleteTopi

  • python3实现从kafka获取数据,并解析为json格式,写入到mysql中

    项目需求:将kafka解析来的日志获取到数据库的变更记录,按照订单的级别和订单明细级别写入数据库,一条订单的所有信息包括各种维度信息均保存在一条json中,写入mysql5.7中. 配置信息: [Global] kafka_server=xxxxxxxxxxx:9092 kafka_topic=mes consumer_group=test100 passwd = tracking port = 3306 host = xxxxxxxxxx user = track schema = track

  • kafka监控获取指定topic的消息总量示例

    我就废话不多说了,直接 上代码吧! import kafka.api.PartitionOffsetRequestInfo; import kafka.common.TopicAndPartition; import kafka.javaapi.OffsetResponse; import kafka.javaapi.PartitionMetadata; import kafka.javaapi.TopicMetadata; import kafka.javaapi.TopicMetadataR

  • spring boot整合kafka过程解析

    这篇文章主要介绍了spring boot整合kafka过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 一.启动kafka 启动kafka之前一定要启动zookeeper,因为要使用kafka必须要使用zookeeper. windows环境下启动,直接使用kafka自带的zookeeper: E:\kafka_2.12-2.4.0\bin\windows zookeeper-server-start.bat ..\..\config\z

  • Spring boot集成Kafka消息中间件代码实例

    一.创建Spring boot项目,添加如下依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <ar

  • Java实现Kafka生产者消费者代码实例

    Kafka的结构与RabbitMQ类似,消息生产者向Kafka服务器发送消息,Kafka接收消息后,再投递给消费者. 生产者的消费会被发送到Topic中,Topic中保存着各类数据,每一条数据都使用键.值进行保存. 每一个Topic中都包含一个或多个物理分区(Partition),分区维护着消息的内容和索引,它们有可能被保存在不同服务器. 新建一个Maven项目,pom.xml 加入依赖: <dependency> <groupId>org.apache.kafka</gro

  • spark通过kafka-appender指定日志输出到kafka引发的死锁问题

    在采用log4j的kafka-appender收集spark任务运行日志时,发现提交到yarn上的任务始终ACCEPTED状态,无法进入RUNNING状态,并且会重试两次后超时.期初认为是yarn资源不足导致,但在确认yarn资源充裕的时候问题依旧,而且基本上能稳定复现. 起初是这么配置spark日志输出到kafka的: log4j.rootCategory=INFO, console, kafka log4j.appender.console=org.apache.log4j.ConsoleA

  • php测试kafka项目示例

    本文实例讲述了php测试kafka项目.分享给大家供大家参考,具体如下: 概述 Kafka是最初由Linkedin公司开发,是一个分布式.分区的.多副本的.多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志.访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目. 主要应用场景是:日志收集系统和消息系统. 安装kafka-php项目依赖 composer require nmred/kafka-

  • Java kafka如何实现自定义分区类和拦截器

    生产者发送到对应的分区有以下几种方式: (1)指定了patition,则直接使用:(可以查阅对应的java api, 有多种参数) (2)未指定patition但指定key,通过对key的value进行hash出一个patition: (3)patition和key都未指定,使用轮询选出一个patition. 但是kafka提供了,自定义分区算法的功能,由业务手动实现分布: 1.实现一个自定义分区类,CustomPartitioner实现Partitioner import org.apache

  • Android 中okhttp自定义Interceptor(缓存拦截器)

    Android 中okhttp自定义Interceptor(缓存拦截器) 前言: 新公司项目是没有缓存的,我的天,坑用户流量不是么.不知道有人就喜欢一个界面没事点来点去的么.怎么办?一个字"加". 由于项目的网络请求被我换成了retrofit.而retrofit的网络请求默认基于okhttp okhttp的缓存由返回的header 来决定.如果服务器支持缓存的话返回的headers里面会有这一句 "Cache-Control","max-age=time&

  • Java通过动态代理实现一个简单的拦截器操作

    一.代理 在使用动态代理实现拦截器之前我们先简单了解一下什么Java的代理. 代理,顾名思义,就是不直接操作被代理(下面都用目标对象称呼,听起来舒服一些)对象,而是通过一个代理对象去间接的使用目标对象中的方法.代理分为两种模式,一种是静态代理,一种是动态代理.接下来先写一个静态代理的例子. 无论是静态代理还是动态代理,目标对象(target)都要实现一个接口(interface),注意,如果使用cglib提供的代理,则不必实现接口,而是通过子类去实现,暂不讨论该种方式. (1)先定义一个接口 p

  • Java的Struts2框架中拦截器使用的实例教程

    1.拦截器小介 拦截器的功能类似于web.xml文件中的Filter,能对用户的请求进行拦截,通过拦截用户的请求来实现对页面的控制.拦截器是在Struts-core-2.2.3.jar中进行配置的,原始的拦截器是在struts-default.xml中配置的,里面封存了拦截器的基本使用方法. Struts2拦截器功能类似于Servlet过滤器.在Action执行execute方法前,Struts2会首先执行struts.xml中引用的拦截器,如果有多个拦截器则会按照上下顺序依次执行,在执行完所有

  • struts2自定义拦截器的示例代码

    题目:使用struts2自定义拦截器,完成用户登陆才能访问权限的实现 在session中存放user变量表示用户登陆,若user为空则用户没有登陆,反之登陆 显示提示信息(请先登录) 定义拦截器 在struts.xml中定义拦截器使用标签<Intercaptors>.<Intercapter>. <interceptors> <interceptor name="test" class="Intercaptor.Intercaptor

  • SpringBoot整合Web之CORS支持与配置类和 XML配置及注册拦截器

    目录 本章概要 CORS 支持 1. 创建SpringBoot工程 2. 创建控制器 3. 配置跨域 4. 测试 配置类与 XML 配置 注册拦截器 本章概要 CORS 支持 配置类与 XML 配置 注册拦截器 CORS 支持 CORS (Cross-Origin Resource Sharing)是由 W3C 制定开发的一种跨域资源共享技术标准,其目的就是为了解决前端的跨域请求.在 Java EE 开发中,最常见的前端跨域请求解决方案是 JSONP ,但是 JSONP 只支持 GET 请求,而

  • 一文详解Java过滤器拦截器实例逐步掌握

    目录 一.过滤器与拦截器相同点 二.过滤器与拦截器区别 三.过滤器与拦截器的实现 四.过滤器与拦截器相关面试题 一.过滤器与拦截器相同点 1.拦截器与过滤器都是体现了AOP的思想,对方法实现增强,都可以拦截请求方法. 2.拦截器和过滤器都可以通过Order注解设定执行顺序 二.过滤器与拦截器区别 在Java Web开发中,过滤器(Filter)和拦截器(Interceptor)都是常见的用于在请求和响应之间进行处理的组件.它们的主要区别如下: 运行位置不同:过滤器是运行在Web服务器和Servl

  • Struts2学习教程之拦截器机制与自定义拦截器

    前言 拦截器体系是Struts2框架的重要组成部分,不夸张的说,没有拦截器体系,也就没有这么好用的Struts2框架了.在Struts2框架中,大量的拦截器完成了很多基础的功能,比如,params拦截器负责解析HTTP请求的参数,并设置Action的属性:servlet-config拦截器直接将HTTP请求中的HttpServletRequest实例和HttpServletResponse实例传给Action:fileUpload拦截器则负责解析请求参数中的文件域,并将一个文件域设置成Actio

  • java SpringMvc中拦截器的应用

    目录 什么是拦截器 基本使用 实例 总结 什么是拦截器 拦截器(Interceptor)是SpringMVC中的组件.可以使很多个请求被处理时,都会执行拦截器中的代码.拦截器可以选择阻止执行或放行. 举个栗子: 基本使用 在SpringMVC中实现HandlerInteceptor拦截器接口,这个类就是一个拦截器类. 利用拦截器最核心的在用控制preHandle方法的返回值,返回true就成功了,返回false就表示进行拦截处理了. 实例 首先,创建一个类继承拦截器 public class D

  • Java Kafka分区发送及消费实战

    目录 前言 业务场景 业务实现 不指定分区 指定分区 topic分区初始化及配置 生产者分区发送方案 消费者 前言 Kafka是现在非常热门的分布式消息队列,常用于微服务间异步通信,业务解耦等场景.kafka的性能非常强大,但是单个微服务吞吐性能是有上限的,我们就会用到分布式微服务,多消费者多生产者进行数据处理,保证性能同时也能根据业务量进行横向拓展,对于同一个微服务的多个实例,输入输出的topic是同一个,这时候我们就可以利用Kafka分区消费来解决这个问题. 业务场景 我们开发的是一个物联网

随机推荐