Java分布式学习之Kafka消息队列

目录
  • 介绍
  • Kafka核心相关名称
  • kafka集群安装
  • kafka使用
    • kafka文件存储
    • Springboot整合kafka

介绍

Apache Kafka 是分布式发布-订阅消息系统,在 kafka官网上对 kafka 的定义:一个分布式发布-订阅消息传递系统。 它最初由LinkedIn公司开发,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。

注意:Kafka并没有遵循JMS规范(),它只提供了发布和订阅通讯方式。

kafka中文官网:http://kafka.apachecn.org/quickstart.html

Kafka核心相关名称

  1. Broker:Kafka节点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群
  2. Topic:一类消息,消息存放的目录即主题,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发
  3. massage: Kafka中最基本的传递对象。
  4. Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。Kafka里面实现分区,一个broker就是表示一个区域。
  5. Segment:partition物理上由多个segment组成,每个Segment存着message信息
  6. Producer : 生产者,生产message发送到topic
  7. Consumer : 消费者,订阅topic并消费message, consumer作为一个线程来消费
  8. Consumer Group:消费者组,一个Consumer Group包含多个consumer
  9. Offset:偏移量,理解为消息 partition 中消息的索引位置

主题和队列的区别:

队列是一个数据结构,遵循先进先出原则

kafka集群安装

参考官方文档:https://kafka.apachecn.org/quickstart.html

  • 每台服务器上安装jdk1.8环境
  • 安装Zookeeper集群环境
  • 安装kafka集群环境
  • 运行环境测试

安装jdk环境和zookeeper这里不详述了。

kafka为什么依赖于zookeeper:kafka会将mq信息存放到zookeeper上,为了使整个集群能够方便扩展,采用zookeeper的事件通知相互感知。

kafka集群安装步骤:

1、下载kafka的压缩包,下载地址:https://kafka.apachecn.org/downloads.html

2、解压安装包

tar -zxvf kafka_2.11-1.0.0.tgz

3、修改kafka的配置文件 config/server.properties

配置文件修改内容:

  • zookeeper连接地址:zookeeper.connect=192.168.1.19:2181
  • 监听的ip,修改为本机的iplisteners=PLAINTEXT://192.168.1.19:9092
  • kafka的brokerid,每台broker的id都不一样broker.id=0

4、依次启动kafka

./kafka-server-start.sh -daemon config/server.properties

kafka使用

kafka文件存储

topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是Producer生成的数据。Producer生成的数据会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个partition分为多个segment,每个segment包括:“.index”文件、“.log”文件和.timeindex等文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。

例如:执行命令新建一个主题,分三个区存放放在三个broker中:

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic kaico

  • 一个partition分为多个segment
  • .log 日志文件
  • .index 偏移量索引文件
  • .timeindex 时间戳索引文件
  • 其他文件(partition.metadata,leader-epoch-checkpoint)

Springboot整合kafka

maven依赖

 <dependencies>
        <!-- springBoot集成kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <!-- SpringBoot整合Web组件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>

yml配置

# kafka
spring:
  kafka:
    # kafka服务器地址(可以多个)
#    bootstrap-servers: 192.168.212.164:9092,192.168.212.167:9092,192.168.212.168:9092
    bootstrap-servers: www.kaicostudy.com:9092,www.kaicostudy.com:9093,www.kaicostudy.com:9094
    consumer:
      # 指定一个默认的组名
      group-id: kafkaGroup1
      # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: earliest
      # key/value的反序列化
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      # key/value的序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 批量抓取
      batch-size: 65536
      # 缓存容量
      buffer-memory: 524288
      # 服务器地址
      bootstrap-servers: www.kaicostudy.com:9092,www.kaicostudy.com:9093,www.kaicostudy.com:9094

生产者

@RestController
public class KafkaController {
	/**
	 * 注入kafkaTemplate
	 */
	@Autowired
	private KafkaTemplate<String, String> kafkaTemplate;
	/**
	 * 发送消息的方法
	 *
	 * @param key
	 *            推送数据的key
	 * @param data
	 *            推送数据的data
	 */
	private void send(String key, String data) {
		// topic 名称 key   data 消息数据
		kafkaTemplate.send("kaico", key, data);
	}
	// test 主题 1 my_test 3
	@RequestMapping("/kafka")
	public String testKafka() {
		int iMax = 6;
		for (int i = 1; i < iMax; i++) {
			send("key" + i, "data" + i);
		}
		return "success";
	}
}

消费者

@Component
public class TopicKaicoConsumer {
    /**
     * 消费者使用日志打印消息
     */
    @KafkaListener(topics = "kaico") //监听的主题
    public void receive(ConsumerRecord<?, ?> consumer) {
        System.out.println("topic名称:" + consumer.topic() + ",key:" +
                consumer.key() + "," +
                "分区位置:" + consumer.partition()
                + ", 下标" + consumer.offset());
        //输出key对应的value的值
        System.out.println(consumer.value());
    }
}

到此这篇关于Java分布式学习之Kafka消息队列的文章就介绍到这了,更多相关Java Kafka内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java Kafka 消费积压监控的示例代码

    后端代码: Monitor.java代码: package com.suncreate.kafkaConsumerMonitor.service; import com.suncreate.kafkaConsumerMonitor.model.ConsumerInfo; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; i

  • Java kafka如何实现自定义分区类和拦截器

    生产者发送到对应的分区有以下几种方式: (1)指定了patition,则直接使用:(可以查阅对应的java api, 有多种参数) (2)未指定patition但指定key,通过对key的value进行hash出一个patition: (3)patition和key都未指定,使用轮询选出一个patition. 但是kafka提供了,自定义分区算法的功能,由业务手动实现分布: 1.实现一个自定义分区类,CustomPartitioner实现Partitioner import org.apache

  • Java实现Kafka生产者和消费者的示例

    Kafka简介 Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写.Kafka的目标是为处理实时数据提供一个统一.高吞吐.低延迟的平台. 方式一:kafka-clients 引入依赖 在pom.xml文件中,引入kafka-clients依赖: <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId

  • java发送kafka事务消息的实现方法

    前言 事务对java开发的同学来说并不陌生,我们使用事务的目的在于避免产生重复数据或者说利用数据存储中间件的事务特性确保数据的精准性,比如大家熟悉的mysql,我们在程序开始时,只需要在程序中添加上事务注解即可 kafka客户端事务,直接使用客户端提供的相关的API即可,和jdbc事务的使用很类似,主要包含下面5个API // 1 初始化事务 void initTransactions(); // 2 开启事务 void beginTransaction() throws ProducerFen

  • Java实现Kafka生产者消费者代码实例

    Kafka的结构与RabbitMQ类似,消息生产者向Kafka服务器发送消息,Kafka接收消息后,再投递给消费者. 生产者的消费会被发送到Topic中,Topic中保存着各类数据,每一条数据都使用键.值进行保存. 每一个Topic中都包含一个或多个物理分区(Partition),分区维护着消息的内容和索引,它们有可能被保存在不同服务器. 新建一个Maven项目,pom.xml 加入依赖: <dependency> <groupId>org.apache.kafka</gro

  • 浅谈Java消息队列总结篇(ActiveMQ、RabbitMQ、ZeroMQ、Kafka)

    一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构.目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ. 二.消息队列应用场景 以下介绍消息队列在实际应用中常用的使用场景.异步处理,应用解耦,流量削锋和消息通讯四个场景. 2.1异步处理 场景说明:用户注册后,需要发注册邮件和注册短信.传统的做法有两种 1.串行的方式;2.并行方式 a.串

  • kafka生产者和消费者的javaAPI的示例代码

    写了个kafka的java demo 顺便记录下,仅供参考 1.创建maven项目 目录如下: 2.pom文件: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://mave

  • Java Kafka分区发送及消费实战

    目录 前言 业务场景 业务实现 不指定分区 指定分区 topic分区初始化及配置 生产者分区发送方案 消费者 前言 Kafka是现在非常热门的分布式消息队列,常用于微服务间异步通信,业务解耦等场景.kafka的性能非常强大,但是单个微服务吞吐性能是有上限的,我们就会用到分布式微服务,多消费者多生产者进行数据处理,保证性能同时也能根据业务量进行横向拓展,对于同一个微服务的多个实例,输入输出的topic是同一个,这时候我们就可以利用Kafka分区消费来解决这个问题. 业务场景 我们开发的是一个物联网

  • Java分布式学习之Kafka消息队列

    目录 介绍 Kafka核心相关名称 kafka集群安装 kafka使用 kafka文件存储 Springboot整合kafka 介绍 Apache Kafka 是分布式发布-订阅消息系统,在 kafka官网上对 kafka 的定义:一个分布式发布-订阅消息传递系统. 它最初由LinkedIn公司开发,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目.Kafka是一种快速.可扩展的.设计内在就是分布式的,分区的和可复制的提交日志服务. 注意:Kafka并没有遵循JMS规范(

  • 大数据Kafka:消息队列和Kafka基本介绍

    目录 一.什么是消息队列 二.消息队列的应用场景 异步处理 应用耦合 限流削峰 消息驱动系统 三.消息队列的两种方式 点对点模式 发布/订阅模式 四.常见的消息队列的产品 1) RabbitMQ 2) activeMQ: 3) RocketMQ 4) kafka 五.Kafka的基本介绍 一.什么是消息队列 消息队列,英文名:Message Queue,经常缩写为MQ.从字面上来理解,消息队列是一种用来存储消息的队列 .来看一下下面的代码 上述代码,创建了一个队列,先往队列中添加了一个消息,然后

  • Spring boot 整合KAFKA消息队列的示例

    这里使用 spring-kafka 依赖和 KafkaTemplate 对象来操作 Kafka 服务. 一.添加依赖和添加配置项 1.1.在 Pom 文件中添加依赖 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> 1.2.添加配置项 spring: kafka: b

  • Java数据结构学习之栈和队列

    一.栈 1.1 概述 Java为什么要有集合类: 临时存储数据. 链表的本质: 对象间通过持有和引用关系互相关联起来. 线性表: 普通线性表, 操作受限线性表(某些操作受到限制 --> 某一个线性表它的增删改操作受到限制) --> 栈 & 队列 1.1.1 线性表的概念 (1)线性表:n个数据元素的有序序列. ①首先,线性表中元素的个数是有限的. ②其次,线性表中元素是有序的. (2)那这个"序"指的是什么呢? ①除表头和表尾元素外,其它元素都有唯一前驱和唯一后继,

  • 关于Kafka消息队列原理的总结

    目录 Kafka消息队列原理 Kafka的逻辑数据模型 Kafka的分发策略 Kafka的物理存储模型和查找数据的设计 Kafka的持久化策略设计 Kafka的节点间的数据一致性策略设计 Kafka的备份和负载均衡 Kafka消息队列内部实现原理 Kafka消息队列原理 最近在测试kafka的读写性能,所以借这个机会了解了kafka的一些设计原理,既然作为分布式系统,我们还是按照分布式的套路进行分析. Kafka的逻辑数据模型 生产者发送数据给服务端时,构造的是ProducerRecord<In

  • WCF分布式开发之MSMQ消息队列

    目录 一.MSMQ简介 1.MSMQ的实现原理 2.安装 3.两个概念 4.MicroSoft.Message.Queue常用的方法: 二.服务端 三.客户端 一.MSMQ简介 MSMQ(微软消息队列)是Windows操作系统中消息应用程序的基础,是用于创建分布式.松散连接的消息通讯应用程序的开发工具. MSMQ与XML Web Services和.Net Remoting一样,是一种分布式开发技术.但是在使用XML Web Services或.Net Remoting组件时,Client端需要

  • 通过pykafka接收Kafka消息队列的方法

    没有Kafka环境,所以也没有进行验证.感觉今后应该能用到,所以借抄在此,备查. pykafka使用示例,自动消费最新消息,不重复消费: # -* coding:utf8 *- from pykafka import KafkaClient host = '192.168.200.38' client = KafkaClient(hosts="%s:9092" % host) print client.topics # 生产者 # topicdocu = client.topics['

  • Java redisTemplate阻塞式处理消息队列

    目录 Redis 消息队列 redis五种数据结构 队列生产者 队列消费者 测试类 并发情况下使用increment递增 补充 Redis 消息队列 redis五种数据结构 队列生产者 package cn.stylefeng.guns.knowledge.modular.knowledge.schedule; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; i

  • KOA+egg.js集成kafka消息队列的示例

    Egg.js : 基于KOA2的企业级框架 Kafka:高吞吐量的分布式发布订阅消息系统 本文章将集成egg + kafka + mysql 的日志系统例子 系统要求:日志记录,通过kafka进行消息队列控制 思路图: 这里消费者和生产者都由日志系统提供 λ.1 环境准备 ①Kafka 官网下载kafka后,解压 启动zookeeper: bin/zookeeper-server-start.sh config/zookeeper.properties 启动Kafka server 这里conf

  • kafka 消息队列中点对点与发布订阅的区别说明

    目录 背景知识 1.JMS中定义 2.二者分析与区别 2.1 点对点模式 2.2 发布订阅模式 3.流行的消息队列模型比较 3.1 RabbitMQ 3.2 Kafka 背景知识 JMS一个在 Java标准化组织(JCP)内开发的标准(代号JSR 914).2001年6月25日,Java消息服务发布JMS 1.0.2b,2002年3月18日Java消息服务发布 1.1. Java消息服务(Java Message Service,JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM

随机推荐