Java使用kafka发送和生产消息的示例

1. maven依赖包

<dependency>
 <groupId>org.apache.kafka</groupId>
 <artifactId>kafka-clients</artifactId>
 <version>0.9.0.1</version>
</dependency> 

2. 生产者代码

package com.lnho.example.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaProducerExample {
 public static void main(String[] args) {
  Properties props = new Properties();
  props.put("bootstrap.servers", "master:9092");
  props.put("acks", "all");
  props.put("retries", 0);
  props.put("batch.size", 16384);
  props.put("linger.ms", 1);
  props.put("buffer.memory", 33554432);
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  Producer<String, String> producer = new KafkaProducer<>(props);
  for(int i = 0; i < 100; i++)
   producer.send(new ProducerRecord<>("topic1", Integer.toString(i), Integer.toString(i)));
  producer.close();
 }
}

3. 消费者代码

package com.lnho.example.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerExample {
 public static void main(String[] args) {
  Properties props = new Properties();
  props.put("bootstrap.servers", "master:9092");
  props.put("group.id", "test");
  props.put("enable.auto.commit", "true");
  props.put("auto.commit.interval.ms", "1000");
  props.put("session.timeout.ms", "30000");
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  consumer.subscribe(Arrays.asList("topic1"));
  while (true) {
   ConsumerRecords<String, String> records = consumer.poll(100);
   for (ConsumerRecord<String, String> record : records)
    System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
  }
 }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

您可能感兴趣的文章:

  • Kafka利用Java实现数据的生产和消费实例教程
  • Kafka使用Java客户端进行访问的示例代码
  • Java API方式调用Kafka各种协议的方法
(0)

相关推荐

  • Kafka利用Java实现数据的生产和消费实例教程

    前言 在上一篇中讲述如何搭建kafka集群,本篇则讲述如何简单的使用 kafka .不过在使用kafka的时候,还是应该简单的了解下kafka. Kafka的介绍 Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据. Kafka 有如下特性: 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能. 高吞吐率.即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输. 支持Kafka Serv

  • Kafka使用Java客户端进行访问的示例代码

    本文环境如下: 操作系统:CentOS 6 32位 JDK版本:1.8.0_77 32位 Kafka版本:0.9.0.1(Scala 2.11) 1. maven依赖包 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> </dependen

  • Java API方式调用Kafka各种协议的方法

    众所周知,Kafka自己实现了一套二进制协议(binary protocol)用于各种功能的实现,比如发送消息,获取消息,提交位移以及创建topic等.具体协议规范参见:Kafka协议  这套协议的具体使用流程为: 1.客户端创建对应协议的请求 2.客户端发送请求给对应的broker 3.broker处理请求,并发送response给客户端 虽然Kafka提供的大量的脚本工具用于各种功能的实现,但很多时候我们还是希望可以把某些功能以编程的方式嵌入到另一个系统中.这时使用Java API的方式就显

  • Java使用kafka发送和生产消息的示例

    1. maven依赖包 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> </dependency> 2. 生产者代码 package com.lnho.example.kafka; import org.apache.kafka.c

  • JAVA通过HttpClient发送HTTP请求的方法示例

    HttpClient介绍 HttpClient 不是一个浏览器.它是一个客户端的 HTTP 通信实现库.HttpClient的目标是发 送和接收HTTP 报文.HttpClient不会去缓存内容,执行 嵌入在 HTML 页面中的javascript 代码,猜测内容类型,重新格式化请求/重定向URI,或者其它和 HTTP 运输无关的功能. HttpClient使用 使用需要引入jar包,maven项目引入如下: <dependency> <groupId>org.apache.htt

  • java开发微服务架构设计消息队列的水有多深

    目录 消息队列的作用 消息队列的设计难题 处理并发和顺序消息 处理重复消息 编写幂等消息处理器 跟踪消息并丢弃重复消息 处理事务性消息 使用数据库表作为消息队列 使用事务日志发布事件 RocketMQ事务消息解决方案 很多人在做架构设计时往往会"过度设计",简单问题复杂化,上来就引一堆中间件,我想大概原因主要有下面两点: 为了秀(学)技术而架构 我们常说技术是为业务服务的,不能为了技术而技术,为了秀技术引入一堆复杂架构这是要不得的. 考虑问题不全面,或者说广度不够,不知道如何简单化 举

  • Java服务假死之生产事故的排查与优化问题

    目录 一.现象 二.排查 1.打印堆栈 2.查看socket连接 3.查看JVM基本信息 4.查看GC日志 5.分析dump文件 三.优化 四.总结 一.现象 在服务器上通过curl命令调用一个Java服务的查询接口,半天没有任何响应.关于该服务的基本功能如下: 1.该服务是一个后台刷新指示器的服务,即该服务会将用户需要的指示器数据提前计算好,放入redis中,当用户请求指示器数据时便从redis中获取: 2.指示器涉及到的模型数据更新时会发送消息到kafka,该服务监听kafka消息,收到消息

  • RocketMQ生产消息与消费消息超详细讲解

    目录 1 RocketMQ简介 2 MQ的常见产品 3 环境搭建 4 单生产者单消费者模式 5 单生产者多消费者模式 5.1默认模式(负载均衡) 5.2广播模式 6 多生产者多消费者模式 1 RocketMQ简介 RocketMQ是阿里开源的一款非常优秀中间件产品,脱胎于阿里的另一款队列技术MetaQ,后捐赠给Apache基金会作为一款孵化技术,仅仅经历了一年多的时间就成为Apache基金会的顶级项目.并且它现在已经在阿里内部被广泛的应用,并且经受住了多次双十一的这种极致场景的压力(2017年的

  • Java实现Kafka生产者和消费者的示例

    Kafka简介 Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写.Kafka的目标是为处理实时数据提供一个统一.高吞吐.低延迟的平台. 方式一:kafka-clients 引入依赖 在pom.xml文件中,引入kafka-clients依赖: <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId

  • 滴滴二面之Kafka如何读写副本消息的

    目录 前言 appendRecords-副本写入 副本读取:fetchMessages 总结 前言 无论是读取副本还是写入副本,都是通过底层的Partition对象完成的,而这些分区对象全部保存在上节课所学的allPartitions字段中.可以说,理解这些字段的用途,是后续我们探索副本管理器类功能的重要前提. 现在,我们就来学习下副本读写功能.整个Kafka的同步机制,本质上就是副本读取+副本写入,搞懂了这两个功能,你就知道了Follower副本是如何同步Leader副本数据的. append

  • Kafka Producer中的消息缓存模型图解详解

    目录 前言 什么是消息累加器RecordAccumulator 消息缓存模型 ProducerBatch的内存大小 内存分配 Batch的创建和释放 问题和答案 总结 前言 在阅读本文之前, 希望你可以思考一下下面几个问题, 带着问题去阅读文章会获得更好的效果. 发送消息的时候, 当Broker挂掉了,消息体还能写入到消息缓存中吗? 当消息还存储在缓存中的时候, 假如Producer客户端挂掉了,消息是不是就丢失了? 当最新的ProducerBatch还有空余的内存,但是接下来的一条消息很大,不

  • kafka监控获取指定topic的消息总量示例

    我就废话不多说了,直接 上代码吧! import kafka.api.PartitionOffsetRequestInfo; import kafka.common.TopicAndPartition; import kafka.javaapi.OffsetResponse; import kafka.javaapi.PartitionMetadata; import kafka.javaapi.TopicMetadata; import kafka.javaapi.TopicMetadataR

  • java实现邮件发送

    本文实例为大家分享了java实现邮件发送的具体代码,供大家参考,具体内容如下 1.使用socket通信功能实现java的邮件传输 1.1 什么是socet Socket的英文原义是"孔"或"插座".在网络编程中,网络上的两个程序通过一个双向的通信连接实现数据的交换,这个连接的一端称为一个socket. Socket套接字是通信的基石,是支持TCP/IP协议的网络通信的基本操作单元.它是网络通信过程中端点的抽象表示,包含进行网络通信必须的五种信息:连接使用的协议,本地

随机推荐