kafka生产实践(详解)

1.引言

最近接触到一个APP流量分析的项目,类似于友盟。涉及到几个C端(客户端)高并发的接口,这几个接口主要用于C端数据的提交。在没有任何缓冲的情况下,一个接口涉及到5张表的提交。压测的结果很不理想,主要瓶颈就在与RDS的交互。

一台双核,16G机子,单实例,jdbc最大连接数100,吞吐量竟然只有50TPS。

能想到的改造方案就是引入一层缓冲,让C端接口不与RDS直接交互,很自然就想到了rabbitmq,但是rabbitmq对分布式的支持比较一般,我们的数据体量也比较大,所以我们借鉴了友盟,引入了kafka,Kafka是一种高吞吐量的分布式发布订阅消息系统,起初在不做任何kafka优化的时候,简单地将C端提交的数据直接send到单节点kafka,就这样,我们的吞吐量达到了100TPS.还是有点小惊喜的。

最近一段时间研究了一下kafka,对一些参数进行调整,目前接口的吞吐量已经达到220TPS,写这篇文章主要想记录一下自己优化和部署经历。

2.kafka简介

kafka的结构图

这张图很好的诠释了kafka的结构,但是遗漏了一点,就是group的概念,我这里补充一下,一个组可以包含多个consumer对多个topic进行消费,但是不同组的消费都是独立的。

也就是说同一个topic的同一条消息可以被不同组的consumer消费。

我这里的主要的优化途径就是将kafka集群化,多partition化,使其并发度更高。

集群化都很好理解,那什么是多partition?

partition是topic的一个概念,即对topic进行分组,不同partition之间的消费相互独立,并且有序。并且一个partiton只能被一个消费者消费,所以咯,假如topic只有一个partition的话,那么消费者实例不能大于一个,那实例再多也没用,受限于kafka的partition。

上面都是讲消费,其实send操作也是一样的,要保证有序必然要等上一个发送ack之后,下一个发送才能进行,如果只有一个partition,那send之后的ack的等待时间必然会阻塞下面一次send,设计多个partition之后,可以同时往多个partition发送消息,自然吞吐量也就上去。

3.kafka集群的搭建以及参数配置

集群搭建

准备两台机子,然后去官网(http://kafka.apache.org/downloads)下载一个包。通过scp到服务器上,解压进入config目录,编辑server.config.

第一台机子配置(172.18.240.36):

broker.id=0 每台服务器的broker.id都不能相同

#hostname
host.name=172.18.240.36

#在log.retention.hours=168 下面新增下面三项
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880

#设置zookeeper的连接端口
zookeeper.connect=172.18.240.36:4001
#默认partition数
num.partitions=2

第二台机子配置(172.18.240.62):

broker.id=1 每台服务器的broker.id都不能相同

#hostname
host.name=172.18.240.62

#在log.retention.hours=168 下面新增下面三项
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880

#设置zookeeper的连接端口
zookeeper.connect=172.18.240.36:4001
#默认partition数
num.partitions=2

新增或者修改成以上配置。

对了,在此之前请先安装zookeeper,如果你用的是zookeeper集群的话,zookeeper.connect可以填写多个,中间用逗号隔开。

然后启动

nohup ./kafka-server-start.sh ../config/server.properties 1>/dev/null 2>&1 &

测试一下:

在第一台机子上开启一个producer

./kafka-console-producer.sh --broker-list 172.18.240.36:9092 --topic test-test

在第二台机子上开启一个consumer

./kafka-console-consumer.sh --bootstrap-server 172.18.240.62:9092 --topic test-test --from-beginning

第一台机子发送一条消息

第二台机子立马收到消息

这样kafka的集群部署就完成了。就下来我们来看看,java的客户端代码如何编写。

4.kafka客户端代码示例

我这里的工程是建立在spring boot 之下的,仅供参考。

在 application.yml下添加如下配置:

kafka:
 consumer:
 default:
  server: 172.18.240.36:9092,172.18.240.62:9092
  enableAutoCommit: false
  autoCommitIntervalMs: 100
  sessionTimeoutMs: 15000
  groupId: data_analysis_group
  autoOffsetReset: latest
 producer:
 default:
  server: 172.18.240.36:9092,172.18.240.62:9092
  retries: 0
  batchSize: 4096
  lingerMs: 1
  bufferMemory: 40960

添加两个配置类

package com.dtdream.analysis.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;

import java.util.HashMap;
import java.util.Map;

@ConfigurationProperties(
  prefix = "kafka.consumer.default"
)
@EnableKafka
@Configuration
public class KafkaConsumerConfig {

 private static final Logger log = LoggerFactory.getLogger(KafkaConsumerConfig.class);
 private static String autoCommitIntervalMs;

 private static String sessionTimeoutMs;

 private static Class keyDeserializerClass = StringDeserializer.class;

 private static Class valueDeserializerClass = StringDeserializer.class;

 private static String groupId = "test-group";

 private static String autoOffsetReset = "latest";

 private static String server;

 private static boolean enableAutoCommit;

 public static String getServer() {
  return server;
 }

 public static void setServer(String server) {
  KafkaConsumerConfig.server = server;
 }

 public static boolean isEnableAutoCommit() {
  return enableAutoCommit;
 }

 public static void setEnableAutoCommit(boolean enableAutoCommit) {
  KafkaConsumerConfig.enableAutoCommit = enableAutoCommit;
 }

 public static String getAutoCommitIntervalMs() {
  return autoCommitIntervalMs;
 }

 public static void setAutoCommitIntervalMs(String autoCommitIntervalMs) {
  KafkaConsumerConfig.autoCommitIntervalMs = autoCommitIntervalMs;
 }

 public static String getSessionTimeoutMs() {
  return sessionTimeoutMs;
 }

 public static void setSessionTimeoutMs(String sessionTimeoutMs) {
  KafkaConsumerConfig.sessionTimeoutMs = sessionTimeoutMs;
 }

 public static Class getKeyDeserializerClass() {
  return keyDeserializerClass;
 }

 public static void setKeyDeserializerClass(Class keyDeserializerClass) {
  KafkaConsumerConfig.keyDeserializerClass = keyDeserializerClass;
 }

 public static Class getValueDeserializerClass() {
  return valueDeserializerClass;
 }

 public static void setValueDeserializerClass(Class valueDeserializerClass) {
  KafkaConsumerConfig.valueDeserializerClass = valueDeserializerClass;
 }

 public static String getGroupId() {
  return groupId;
 }

 public static void setGroupId(String groupId) {
  KafkaConsumerConfig.groupId = groupId;
 }

 public static String getAutoOffsetReset() {
  return autoOffsetReset;
 }

 public static void setAutoOffsetReset(String autoOffsetReset) {
  KafkaConsumerConfig.autoOffsetReset = autoOffsetReset;
 }

 @Bean
 public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
  ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  factory.setConsumerFactory(consumerFactory());
  factory.setConcurrency(10);
  factory.getContainerProperties().setPollTimeout(3000);
  factory.setRecordFilterStrategy(new RecordFilterStrategy<String, String>() {
   @Override
   public boolean filter(ConsumerRecord<String, String> consumerRecord) {
    log.debug("partition is {},key is {},topic is {}",
      consumerRecord.partition(), consumerRecord.key(), consumerRecord.topic());
    return false;
   }
  });
  return factory;
 }

 private ConsumerFactory<String, String> consumerFactory() {
  return new DefaultKafkaConsumerFactory<>(consumerConfigs());
 }

 private Map<String, Object> consumerConfigs() {
  Map<String, Object> propsMap = new HashMap<>();
  propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
  propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
  propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
  propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
  propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass);
  propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass);
  propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
  return propsMap;

 }

 /* @Bean
 public Listener listener() {
  return new Listener();
 }*/
}
package com.dtdream.analysis.config;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * Created with IntelliJ IDEA.
 * User: chenqimiao
 * Date: 2017/7/24
 * Time: 9:43
 * To change this template use File | Settings | File Templates.
 */
@ConfigurationProperties(
  prefix = "kafka.producer.default",
  ignoreInvalidFields = true
)//注入一些属性域
@EnableKafka
@Configuration//使得@Bean注解生效
public class KafkaProducerConfig {
 private static String server;
 private static Integer retries;
 private static Integer batchSize;
 private static Integer lingerMs;
 private static Integer bufferMemory;
 private static Class keySerializerClass = StringSerializer.class;
 private static Class valueSerializerClass = StringSerializer.class;

 private Map<String, Object> producerConfigs() {
  Map<String, Object> props = new HashMap<>();
  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
  props.put(ProducerConfig.RETRIES_CONFIG, retries);
  props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
  props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
  props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
  return props;
 }

 private ProducerFactory<String, String> producerFactory() {
  return new DefaultKafkaProducerFactory<>(producerConfigs());
 }

 public static String getServer() {
  return server;
 }

 public static void setServer(String server) {
  KafkaProducerConfig.server = server;
 }

 public static Integer getRetries() {
  return retries;
 }

 public static void setRetries(Integer retries) {
  KafkaProducerConfig.retries = retries;
 }

 public static Integer getBatchSize() {
  return batchSize;
 }

 public static void setBatchSize(Integer batchSize) {
  KafkaProducerConfig.batchSize = batchSize;
 }

 public static Integer getLingerMs() {
  return lingerMs;
 }

 public static void setLingerMs(Integer lingerMs) {
  KafkaProducerConfig.lingerMs = lingerMs;
 }

 public static Integer getBufferMemory() {
  return bufferMemory;
 }

 public static void setBufferMemory(Integer bufferMemory) {
  KafkaProducerConfig.bufferMemory = bufferMemory;
 }

 public static Class getKeySerializerClass() {
  return keySerializerClass;
 }

 public static void setKeySerializerClass(Class keySerializerClass) {
  KafkaProducerConfig.keySerializerClass = keySerializerClass;
 }

 public static Class getValueSerializerClass() {
  return valueSerializerClass;
 }

 public static void setValueSerializerClass(Class valueSerializerClass) {
  KafkaProducerConfig.valueSerializerClass = valueSerializerClass;
 }

 @Bean(name = "kafkaTemplate")
 public KafkaTemplate<String, String> kafkaTemplate() {
  return new KafkaTemplate<>(producerFactory());
 }
}

利用kafkaTemplate即可完成发送。

@Autowired
private KafkaTemplate<String,String> kafkaTemplate;

 @RequestMapping(
   value = "/openApp",
   method = RequestMethod.POST,
   produces = MediaType.APPLICATION_JSON_UTF8_VALUE,
   consumes = MediaType.APPLICATION_JSON_UTF8_VALUE
 )
 @ResponseBody
 public ResultDTO openApp(@RequestBody ActiveLogPushBo activeLogPushBo, HttpServletRequest request) {

  logger.info("openApp: activeLogPushBo {}, dateTime {}", JSONObject.toJSONString(activeLogPushBo),new DateTime().toString("yyyy-MM-dd HH:mm:ss.SSS"));

  String ip = (String) request.getAttribute("ip");

  activeLogPushBo.setIp(ip);

  activeLogPushBo.setDate(new Date());

  //ResultDTO resultDTO = dataCollectionService.collectOpenInfo(activeLogPushBo);

  kafkaTemplate.send("data_collection_open",JSONObject.toJSONString(activeLogPushBo));

  // logger.info("openApp: resultDTO {} ,dateTime {}", resultDTO.toJSONString(),new DateTime().toString("yyyy-MM-dd HH:mm:ss.SSS"));

  return new ResultDTO().success();
 }

kafkaTemplate的send方法会更根据你指定的key进行hash,再对partition数进行去模,最后决定发送到那一个分区,假如没有指定key,那send方法对分区的选择是随机。具体怎么随机的话,这里就不展开讲了,有兴趣的同学可以自己看源码,我们可以交流交流。

接着配置一个监听器

package com.dtdream.analysis.listener;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;

import java.util.Optional;
@Component
public class Listener {

 private Logger logger = LoggerFactory.getLogger(this.getClass());

 @KafkaListener(topics = {"test-topic"})
 public void listen(ConsumerRecord<?, ?> record) {
  Optional<?> kafkaMessage = Optional.ofNullable(record.value());
  if (kafkaMessage.isPresent()) {
   Object message = kafkaMessage.get();
   logger.info("message is {} ", message);
  }
 }
}

@KafkaListener其实可以具体指定消费哪个分区,如果不指定的话,并且只有一个消费者实例,那么这个实例会消费所有的分区的消息。

消费者的数量是一定要少于partition的数量的,不然没有任何意义。会出现消费者过剩的情况。

消费者数量和partition数量的多与少,会动态影响消费节点所消费的partition数目,最终会在整个集群中达到一种动态平衡。

5.总结

理论上只要cpu核心数无限,那么partition数也可以无上限,与此同时消费者节点和生产者节点也可以无上限,最终会使单个topic的并发无上限。单机的cpu的核心数总是会达到一个上限,kafka作为分布式系统,可以很好利用集群的运算能力,进行动态扩展,在DT时代,应该会慢慢成为主流吧。

以上这篇kafka生产实践(详解)就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • kafka生产实践(详解)

    1.引言 最近接触到一个APP流量分析的项目,类似于友盟.涉及到几个C端(客户端)高并发的接口,这几个接口主要用于C端数据的提交.在没有任何缓冲的情况下,一个接口涉及到5张表的提交.压测的结果很不理想,主要瓶颈就在与RDS的交互. 一台双核,16G机子,单实例,jdbc最大连接数100,吞吐量竟然只有50TPS. 能想到的改造方案就是引入一层缓冲,让C端接口不与RDS直接交互,很自然就想到了rabbitmq,但是rabbitmq对分布式的支持比较一般,我们的数据体量也比较大,所以我们借鉴了友盟,

  • spring-cloud-stream结合kafka使用详解

    1.pom文件导入依赖 <!-- kafka --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency> 2.application.yml文件配置 spring: cloud: stream: kafka: bind

  • Spring Boot整合Kafka教程详解

    目录 正文 步骤一:添加依赖项 步骤二:配置 Kafka 步骤三:创建一个生产者 步骤四:创建一个消费者 正文 本教程将介绍如何在 Spring Boot 应用程序中使用 Kafka.Kafka 是一个分布式的发布-订阅消息系统,它可以处理大量数据并提供高吞吐量. 在本教程中,我们将使用 Spring Boot 2.5.4 和 Kafka 2.8.0. 步骤一:添加依赖项 在 pom.xml 中添加以下依赖项: <dependency> <groupId>org.springfra

  • kafka与storm集群环境的安装步骤详解

    前言 在开始之前,需要说明下,storm和kafka集群安装是没有必然联系的,我将这两个写在一起,是因为他们都是由zookeeper进行管理的,也都依赖于JDK的环境,为了不重复再写一遍配置,所以我将这两个写在一起.若只需一个,只需挑选自己选择的阅读即可.下面话不多说了,来一起看看详细的介绍吧. 这两者的依赖如下: Storm集群:JDK1.8 , Zookeeper3.4,Storm1.1.1: Kafa集群 : JDK1.8 ,Zookeeper3.4 ,Kafka2.12: 说明: Sto

  • 在Spring Boot应用程序中使用Apache Kafka的方法步骤详解

    第1步:生成我们的项目: Spring Initializr来生成我们的项目.我们的项目将提供Spring MVC / Web支持和Apache Kafka支持. 第2步:发布/读取Kafka主题中的消息: <b>public</b> <b>class</b> User { <b>private</b> String name; <b>private</b> <b>int</b> age

  • Laravel中Kafka的使用详解

    本文并没有kafka的安装教程,本文是针对已经安装kafka及其配置好kafka的php拓展并且使用laravel框架进行开发项目,配置一个可供laravel框架使用的生产及消费者类. 以下代码修改自本站的YII框架关于kafka类的代码,经过测试使用在本人的项目中,可正常运行,larvael版本:5.6 代码放置larvael框架位置:app/Tools/Kafka.php <?php namespace App\Tools; use Illuminate\Config\Repository;

  • PHP扩展之kafka安装应用案例详解

    话说用了就要有点产出,要不然过段时间又忘了,所以在这里就记录一下试用Kafka的安装过程和php扩展的试用. 实话说,如果用于队列的话,跟PHP比较配的,还是Redis.用的顺手,呵呵,只是Redis不能有多个consumer.但Kafka官方对PHP不支持,PHP扩展是爱好者或使用者写的.下面就开始讲Kafka的安装吧.我以CentOS6.4为例,64位. 一. 首先确认下jdk有没有安装 使用命令 [root@localhost ~]# java -version java version

  • 详解kafka中的消息分区分配算法

    目录 背景 RangeAssignor 定义 源码分析 场景 RoundRobinAssignor 定义 源码分析 场景 StickyAssignor 定义 场景 背景 kafka有分区机制,一个主题topic在创建的时候,会设置分区.如果只有一个分区,那所有的消费者都订阅的是这一个分区消息:如果有多个分区的话,那消费者之间又是如何分配的呢? 分配算法 RangeAssignor 定义 Kafka默认采⽤RangeAssignor的分配算法. RangeAssignor策略的原理是按照消费者总数

  • Go操作Kafka和Etcd方法详解

    目录 操作Kafka sarama 下载及安装 注意事项 连接 kafka 发送消息 连接 kafka 消费消息 操作Etcd 安装 put和get操作 watch操作 安装报错: 操作Kafka Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据,具有高性能.持久化.多副本备份.横向扩展等特点.本文介绍了如何使用 Go 语言发送和接收 kafka 消息. sarama Go 语言中连接 kafka 使用第三方库:github.com/Shopify

  • Kafka消费客户端协调器GroupCoordinator详解

    目录 协调器的生命周期 GroupCoordinator的创建 offsetConfig相关配置 groupConfig相关配置 groupMetadataManager heartbeatPurgatory GroupCoordinator的启动 GroupCoordinator OnElection GroupCoordinator onResignation 协调器的生命周期 什么是协调器 协调器工作原理 协调器的Rebalance机制 GroupCoordinator的创建 在Kafka

随机推荐