Java实现Kafka生产者和消费者的示例

Kafka简介

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。

方式一:kafka-clients

引入依赖

在pom.xml文件中,引入kafka-clients依赖:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.3.1</version>
</dependency>

生产者

创建一个KafkaProducer的生产者实例:

@Configuration
public class Config {

  public final static String bootstrapServers = "127.0.0.1:9092";

  @Bean(destroyMethod = "close")
  public KafkaProducer<String, String> kafkaProducer() {
    Properties props = new Properties();
    //设置Kafka服务器地址
    props.put("bootstrap.servers", bootstrapServers);
    //设置数据key的序列化处理类
    props.put("key.serializer", StringSerializer.class.getName());
    //设置数据value的序列化处理类
    props.put("value.serializer", StringSerializer.class.getName());
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    return producer;
  }
}

在Controller中进行使用:

@RestController
@Slf4j
public class Controller {

  @Autowired
  private KafkaProducer<String, String> kafkaProducer;

  @RequestMapping("/kafkaClientsSend")
  public String send() {
    String uuid = UUID.randomUUID().toString();
    RecordMetadata recordMetadata = null;
    try {
     //将消息发送到Kafka服务器的名称为“one-more-topic”的Topic中
      recordMetadata = kafkaProducer.send(new ProducerRecord<>("one-more-topic", uuid)).get();
      log.info("recordMetadata: {}", recordMetadata);
      log.info("uuid: {}", uuid);
    } catch (Exception e) {
      log.error("send fail, uuid: {}", uuid, e);
    }
    return uuid;
  }
}

消费者

创建一个KafkaConsumer的消费者实例:

@Configuration
public class Config {

  public final static String groupId = "kafka-clients-group";
  public final static String bootstrapServers = "127.0.0.1:9092";

  @Bean(destroyMethod = "close")
  public KafkaConsumer<String, String> kafkaConsumer() {
    Properties props = new Properties();
    //设置Kafka服务器地址
    props.put("bootstrap.servers", bootstrapServers);
    //设置消费组
    props.put("group.id", groupId);
    //设置数据key的反序列化处理类
    props.put("key.deserializer", StringDeserializer.class.getName());
    //设置数据value的反序列化处理类
    props.put("value.deserializer", StringDeserializer.class.getName());
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
    //订阅名称为“one-more-topic”的Topic的消息
    kafkaConsumer.subscribe(Arrays.asList("one-more-topic"));
    return kafkaConsumer;
  }
}

在Controller中进行使用:

@RestController
@Slf4j
public class Controller {

  @Autowired
  private KafkaConsumer<String, String> kafkaConsumer;

  @RequestMapping("/receive")
  public List<String> receive() {
   从Kafka服务器中的名称为“one-more-topic”的Topic中消费消息
    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
    List<String> messages = new ArrayList<>(records.count());
    for (ConsumerRecord<String, String> record : records.records("one-more-topic")) {
      String message = record.value();
      log.info("message: {}", message);
      messages.add(message);
    }
    return messages;
  }
}

方式二:spring-kafka

使用kafka-clients需要我们自己创建生产者或者消费者的bean,如果我们的项目基于SpringBoot构建,那么使用spring-kafka就方便多了。

引入依赖

在pom.xml文件中,引入spring-kafka依赖:

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.3.12.RELEASE</version>
</dependency>

生产者

在application.yml文件中增加配置:

spring:
 kafka:
  #Kafka服务器地址
  bootstrap-servers: 127.0.0.1:9092
  producer:
   #设置数据value的序列化处理类
   value-serializer: org.apache.kafka.common.serialization.StringSerializer

在Controller中注入KafkaTemplate就可以直接使用了,代码如下:

@RestController
@Slf4j
public class Controller {

  @Autowired
  private KafkaTemplate<String, String> template;

  @RequestMapping("/springKafkaSend")
  public String send() {
    String uuid = UUID.randomUUID().toString();
    //将消息发送到Kafka服务器的名称为“one-more-topic”的Topic中
    this.template.send("one-more-topic", uuid);
    log.info("uuid: {}", uuid);
    return uuid;
  }
}

消费者

在application.yml文件中增加配置:

spring:
 kafka:
  #Kafka服务器地址
  bootstrap-servers: 127.0.0.1:9092
  consumer:
   #设置数据value的反序列化处理类
   value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

创建一个可以被Spring框架扫描到的类,并且在方法上加上@KafkaListener注解,就可以消费消息了,代码如下:

@Component
@Slf4j
public class Receiver {

  @KafkaListener(topics = "one-more-topic", groupId = "spring-kafka-group")
  public void listen(ConsumerRecord<?, ?> record) {
    Optional<?> kafkaMessage = Optional.ofNullable(record.value());
    if (kafkaMessage.isPresent()) {
      String message = (String) kafkaMessage.get();
      log.info("message: {}", message);
    }
  }
}

到此这篇关于Java实现Kafka生产者和消费者的示例的文章就介绍到这了,更多相关Java Kafka生产者和消费者 内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • kafka生产者和消费者的javaAPI的示例代码

    写了个kafka的java demo 顺便记录下,仅供参考 1.创建maven项目 目录如下: 2.pom文件: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://mave

  • Java实现Kafka生产者消费者代码实例

    Kafka的结构与RabbitMQ类似,消息生产者向Kafka服务器发送消息,Kafka接收消息后,再投递给消费者. 生产者的消费会被发送到Topic中,Topic中保存着各类数据,每一条数据都使用键.值进行保存. 每一个Topic中都包含一个或多个物理分区(Partition),分区维护着消息的内容和索引,它们有可能被保存在不同服务器. 新建一个Maven项目,pom.xml 加入依赖: <dependency> <groupId>org.apache.kafka</gro

  • Java实现Kafka生产者和消费者的示例

    Kafka简介 Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写.Kafka的目标是为处理实时数据提供一个统一.高吞吐.低延迟的平台. 方式一:kafka-clients 引入依赖 在pom.xml文件中,引入kafka-clients依赖: <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId

  • 浅谈java线程中生产者与消费者的问题

    一.概念 生产者与消费者问题是一个金典的多线程协作的问题.生产者负责生产产品,并将产品存放到仓库:消费者从仓库中获取产品并消费.当仓库满时,生产者必须停止生产,直到仓库有位置存放产品:当仓库空时,消费者必须停止消费,直到仓库中有产品. 解决生产者/消费者问题主要用到如下几个技术:1.用线程模拟生产者,在run方法中不断地往仓库中存放产品.2.用线程模拟消费者,在run方法中不断地从仓库中获取产品.3  . 仓库类保存产品,当产品数量为0时,调用wait方法,使得当前消费者线程进入等待状态,当有新

  • PHP基于rabbitmq操作类的生产者和消费者功能示例

    本文实例讲述了PHP基于rabbitmq操作类的生产者和消费者功能.分享给大家供大家参考,具体如下: 注意事项: 1.accept.php消费者代码需要在命令行执行 2.'username'=>'asdf','password'=>'123456' 改成自己的帐号和密码 RabbitMQCommand.php操作类代码 <?php /* * amqp协议操作类,可以访问rabbitMQ * 需先安装php_amqp扩展 */ class RabbitMQCommand{ public $

  • 浅谈Java中生产者与消费者问题的演变

    想要了解更多关于Java生产者消费者问题的演变吗?那就看看这篇文章吧,我们分别用旧方法和新方法来处理这个问题. 生产者消费者问题是一个典型的多进程同步问题. 对于大多数人来说,这个问题可能是我们在学校,执行第一次并行算法所遇到的第一个同步问题. 虽然它很简单,但一直是并行计算中的最大挑战 - 多个进程共享一个资源. 问题陈述 生产者和消费者两个程序,共享一个大小有限的公共缓冲区. 假设一个生产者"生产"一份数据并将其存储在缓冲区中,而一个消费者"消费"这份数据,并将

  • Java多线程并发生产者消费者设计模式实例解析

    一.两个线程一个生产者一个消费者 需求情景 两个线程,一个负责生产,一个负责消费,生产者生产一个,消费者消费一个. 涉及问题 同步问题:如何保证同一资源被多个线程并发访问时的完整性.常用的同步方法是采用标记或加锁机制. wait() / nofity() 方法是基类Object的两个方法,也就意味着所有Java类都会拥有这两个方法,这样,我们就可以为任何对象实现同步机制. wait()方法:当缓冲区已满/空时,生产者/消费者线程停止自己的执行,放弃锁,使自己处于等待状态,让其他线程执行. not

  • java开发RocketMQ生产者高可用示例详解

    目录 引言 1 消息 1.1 topic 1.2 Body 1.3 tag 1.4 key 1.5 延迟级别 2 生产者高可用 2.1 客户端保证生产者高可用 2.1.1 重试机制 2.1.2 客户端容错 2.2 Broker端保证生产者高可用 引言 前边两章说了点基础的,从这章开始,我们挖挖源码.看看RocketMQ是怎么工作的. 首先呢,这个生产者就是送孩子去码头的家长,孩子们呢,就是消息了. 我们看看消息孩子们都长啥样. 1 消息 public class Message implemen

  • Java Kafka实现延迟队列的示例代码

    目录 基于kafka如何实现延迟队列 完善细节 Java代码实现 还需要做什么 kafka作为一个使用广泛的消息队列,很多人都不会陌生,但当你在网上搜索“kafka 延迟队列”,出现的都是一些讲解时间轮或者只是提供了一些思路,并没有一份真实可用的代码实现,今天我们就来打破这个现象,提供一份可运行的代码,抛砖引玉,吸引更多的大神来分享. 基于kafka如何实现延迟队列 想要解决一个问题,我们需要先分解问题.kafka作为一个高性能的消息队列,只要消费能力足够,发出的消息都是会立刻收到的,因此我们需

  • JAVA多线程实现生产者消费者的实例详解

    JAVA多线程实现生产者消费者的实例详解 下面的代码实现了生产者消费者的问题 Product.Java package consumerProducer; public class Product { private String id; public String getId() { return id; } public void setId(String id) { this.id = id; } public Product(String id) { this.id=id; } publ

随机推荐