.Net Core 集成 Kafka的步骤

目录
  • kafka
    • broker
    • topic
    • partition
    • consumer group
  • 安装kafka
  • .net 操作 kafka
  • 生产者
  • 消费者
  • 运行一下
  • 总结

最近维护的一个系统并发有点高,所以想引入一个消息队列来进行削峰。考察了一些产品,最终决定使用kafka来当做消息队列。以下是关于kafka的一些知识的整理笔记。

kafka

kafka 是分布式流式平台。它由linkedin开发,后贡献给了Apache开源组织并成为顶级开源项目。它可以应用在高并发场景下的日志系统,也可以当作消息队列来使用,也可以当作消息服务对系统进行解耦。

流处理平台有以下三种特性:

  • 可以让你发布和订阅流式的记录。这一方面与消息队列或者企业消息系统类似。
  • 可以储存流式的记录,并且有较好的容错性。
  • 可以在流式记录产生时就进行处理。

一般它可以应用于两个场景:

  • 构造实时流数据管道,它可以在系统或应用之间可靠地获取数据。 (相当于message queue)
  • 构建实时流式应用程序,对这些流数据进行转换或者影响。 (就是流处理,通过kafka stream topic和topic之间内部进行变化)

broker

kafka中的每个节点即每个服务器就是一个broker 。

topic

kafka中的topic是一个分类的概念,表示一类消息。生产者在生产消息的时候需要指定topic,消费者在消费消息的时候也需要指定topic。

partition

partition是分区的概念。kafka的一个topic可以有多个partition。每个partition会分散到不同的broker上,起到负载均衡的作用。生产者的消息会通过算法均匀的分散在各个partition上。

consumer group

kafka的消费者有个组的概念。一个partition可以被多consumer group订阅。每个消息会广播到每一个group中。但是每个消息只会被group中的一个consumer消费。相当于每个group,一个partition只能有一个consumer订阅,所以group中的consumer数量不可以超过topic中partition的数量。并且消息的消费的顺序在每个partition中是保证有序的,但是在多个partition之间是不保证的,因为consumer的消费速度是有快慢的。
所以如果要用kafka实现严格的消息队列点对点模式那么我们可以设置一个partition并且设置一个consumer。如果对消息消费的顺序不是那么敏感,那么可以设置多个partition来并行消费消息,提高吞吐量。

安装kafka

为了能体验下kafka,我们还是要实际安装一下kafka,毕竟空想是没有用的。现在有了docker,安装起来也是相当滴简单。我们只需要定义好docker-compose的yml就行了。

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.0.117
      KAFKA_CREATE_TOPICS: "test:3:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

1.我们在yml里定义2个service:

2.zookeeper,kafka的分布式依赖zookeeper,所以我需要先定义它。
kafka ,kafka的定义有几个地方要注意的。

  • depends_on:zookeeper 指定kafka依赖zookeeper这个service,当启动kafka的时候自动会启动zookeeper。
  • KAFKA_ADVERTISED_HOST_NAME 这里要指定宿主机的ip
  • KAFKA_CREATE_TOPICS 这个变量只是的默认创建的topic。"test:3:1"代表创建一个名为test的topic并且创建3个分区1个复制。

定义好这些之后我们只需要使用docker-compose命令运行它:

sudo docker-compose up -d

.net 操作 kafka

安装好kafka的docker环境之后,下面演示下如何使用.net操作kafka,进行消息的生产与消费。

生产者

        static async Task Main(string[] args)
        {
            Console.WriteLine("Hello World Producer!");

            var config = new ProducerConfig
            {
                BootstrapServers = "192.168.0.117:9092",
                ClientId = Dns.GetHostName(),
            };

            using (var producer = new ProducerBuilder<Null, string>(config).Build())
            {
                string topic = "test";
                for (int i = 0; i < 100; i++)
                {
                    var msg = "message " + i;
                    Console.WriteLine($"Send message:   value {msg}");
                    var result = await producer.ProduceAsync(topic, new Message<Null, string> { Value = msg });
                    Console.WriteLine($"Result: key {result.Key} value {result.Value} partition:{result.TopicPartition}");
                    Thread.Sleep(500);
                }
            }

            Console.ReadLine();

        }

新建一个控制台项目,从nuget安装kafka的官方client。

Install-Package Confluent.Kafka

代码非常简单,使用ProducerBuilder构造一个producer,然后调用ProduceAsync方法发送消息。
其中需要注意的是如果你的场景并发非常之高,官方文档推荐的方法是Produce而不是ProduceAsync。这是一个比较迷的地方。按常理使用ProduceAsync应该比使用同步方法Produce能获得更高的并发才对。但是文档确确实实说高并发场景请使用Produce。可能是为了避免ProduceAsync结果返回的时候异步线程上下文切换造成的性能开销。
原文:

There are a couple of additional benefits of using the Produce method. First, notification of message delivery (or failure) is strictly in the order of broker acknowledgement. With ProduceAsync, this is not the case because Tasks may complete on any thread pool thread. Second, Produce is more performant because there is unavoidable overhead in the higher level Task based API.

消费者

        static void Main(string[] args)
        {
            Console.WriteLine("Hello World kafka consumer !");

            var config = new ConsumerConfig
            {
                BootstrapServers = "192.168.0.117:9092",
                GroupId = "foo",
                AutoOffsetReset = AutoOffsetReset.Earliest
            };

            var cancel = false;

            using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
            {
                var topic = "test";
                consumer.Subscribe(topic);

                while (!cancel)
                {
                    var consumeResult = consumer.Consume(CancellationToken.None);

                    Console.WriteLine($"Consumer message: { consumeResult.Message.Value} topic: {consumeResult.Topic} Partition: {consumeResult.Partition}");
                }

                consumer.Close();
            }
        }

消费者的演示代码同样很简单。我们需要指定groupId,然后订阅topic。使用ConsumerBuilder构造一个consumer,然后调用Consume方法进行消费就可以。
注意:
这里默认是自动commit消费。你也可以根据情况手动提交commit。

运行一下

我们运行一个生产者进程,按照500ms的速度生产消息。运行三个consumer进行消费,可以看到消息被均匀的推送到三个consumer上去。

总结

以上简单的介绍了kafka的背景、安装方法、使用场景。还简单演示了如何使用.net来操作kafka。它可以当作流式计算平台来使用,也可以当作传统的消息队列使用。它当前非常流行,网上的资料也多如牛毛。官方也提供了简单易用的.net sdk ,为.net 平台集成kafka提供了便利。

以上就是.Net Core 集成 Kafka的步骤的详细内容,更多关于.Net Core 集成 Kafka的资料请关注我们其它相关文章!

(0)

相关推荐

  • asp.net core集成CKEditor实现图片上传功能的示例代码

    背景 本文为大家分享了asp.net core 如何集成CKEditor ,并实现图片上传功能的具体方法,供大家参考,具体内容如下. 准备工作 1.visual studio 2019 开发环境 2.net core 2.0 及以上版本 实现方法 1.新建asp.net core web项目 2.下载CKEditor 这里我们新建了一个系统自带的样本项目,去 CKEditor官网下载一个版本,解压后拷贝大wwwroot中 3.增加图片上传控制器 @using CompanyName.Projec

  • asp.net core 使用 TestServer 来做集成测试的方法

    Intro 之前我的项目里的集成测试是随机一个端口,每次都真实的启动一个 WebServer,之前也有看到过微软文档上 TestServer 的介绍,当时没仔细看过以为差不多就没用,一直是启动了一个真正的 WebServer 去跑集成测试的,上次分享 Xunit.DependencyInjection 改造测试项目的时候,写的烂代码被大师看到了之后, 大师建议用 TestServer 来做集成测试,使用 TestServer 不会真正的占用端口号,不会出现权限问题,于是扒了扒 TestServe

  • 在ASP.NET Core Mvc集成MarkDown的方法

    这几天在做文章编辑,首先就想到了markdown,它比其它的都要新,而且很好用,相对于其它的html编辑器,好久不更新,要好得多,哦~对了我现在已经用上新版的Edge了,经过很多朋友测试,性能比谷歌浏览器都要好很多,并且资源消耗也相对来说小. 一.前提 好吧,言归正传,你首先需要下载MarkDown的相关样式脚本资源,下载完毕之后拖放你的ASP.NET Core Mvc 项目中的wwwroot中. 二.初始化 在页面中我们理所当然需要引用css 脚本资源,随后调用它的初始化方法. <script

  • 如何在Asp.Net Core中集成Refit

    在很多时候我们在不同的服务之间需要通过HttpClient进行及时通讯,在我们的代码中我们会创建自己的HttpClient对象然后去跨领域额进行数据的交互,但是往往由于一个项目有多个人开发所以在开发中没有人经常会因为不同的业务请求去写不同的代码,然后就会造成各种风格的HttpClient的跨域请求,最重要的是由于每个人对HttpClient的理解程度不同所以写出来的代码可能质量上会有参差不齐,即使代码能够达到要求往往也显得非常臃肿,重复高我们在正式介绍Refit这个项目之前,我们来看看我们在项目

  • 如何在Asp.Net Core中集成ABP Dapper

    在实际的项目中,除了集成ABP框架的EntityFrameworkCore以外,在有些特定的场景下不可避免地会使用一些SQL查询语句,一方面是由于现在的EntityFrameworkCore2.X有些问题没有解决,另外一方面是基于性能方面的考虑,在了解本篇内容之前,首先还是来看看官方文档来给出的说明. 按照官方的介绍整体可以分为下面的步骤:1 安装依赖包.2 添加DependsOn属性标签.3 Entity to Table Mapping. 4 Usage 通过上面的4个步骤我们就能够正常在A

  • .NET Core下使用Kafka的方法步骤

    安装 CentOS安装 kafka Kafka : http://kafka.apache.org/downloads ZooLeeper : https://zookeeper.apache.org/releases.html 下载并解压 # 下载,并解压 $ wget https://archive.apache.org/dist/kafka/2.1.1/kafka_2.12-2.1.1.tgz $ tar -zxvf kafka_2.12-2.1.1.tgz $ mv kafka_2.12

  • 基于Jenkins搭建.NET Core持续集成环境过程图解

    我们用NuGet还原.NET Core项目会报以下错误: error NETSDK1064: 未找到版本为 1.8.2 的包 BouncyCastle.NetCore.它可能已在 NuGet 还原后删除.否则,NuGet 还原可能只是部分完成,这种情况可能是最大路径长度限制所导致. 关于这个问题我找了好久 也是google到一条信息才明白 这里就直接放解决办法,其实网上就有方法,有的简短,有的笼统 我们这里用到的是dotnet命令来还原.构建.以及部署 从 .NET Core 2.0 开始,无需

  • asp.net core集成JWT的步骤记录

    [什么是JWT] JSON Web Token(JWT)是目前最流行的跨域身份验证解决方案. JWT的官网地址:https://jwt.io/ 通俗地来讲,JWT是能代表用户身份的令牌,可以使用JWT令牌在api接口中校验用户的身份以确认用户是否有访问api的权限. JWT中包含了身份认证必须的参数以及用户自定义的参数,JWT可以使用秘密(使用HMAC算法)或使用RSA或ECDSA的公钥/私钥对进行签名. [什么时候应该使用JSON Web令牌?] 授权:这是使用JWT的最常见方案.一旦用户登录

  • .Net Core 集成 Kafka的步骤

    目录 kafka broker topic partition consumer group 安装kafka .net 操作 kafka 生产者 消费者 运行一下 总结 最近维护的一个系统并发有点高,所以想引入一个消息队列来进行削峰.考察了一些产品,最终决定使用kafka来当做消息队列.以下是关于kafka的一些知识的整理笔记. kafka kafka 是分布式流式平台.它由linkedin开发,后贡献给了Apache开源组织并成为顶级开源项目.它可以应用在高并发场景下的日志系统,也可以当作消息

  • SpringBoot集成Kafka的步骤

    SpringBoot集成Kafka 本篇主要讲解SpringBoot 如何集成Kafka ,并且简单的 编写了一个Demo 来测试 发送和消费功能 前言 选择的版本如下: springboot : 2.3.4.RELEASE spring-kafka : 2.5.6.RELEASE kafka : 2.5.1 zookeeper : 3.4.14 本Demo 使用的是 SpringBoot 比较高的版本 SpringBoot 2.3.4.RELEASE 它会引入 spring-kafka 2.5

  • ASP.NET Core 集成 React SPA应用的步骤

    目录 wwwroot\ui ReactUIMiddleware 运行一下 总结 AgileConfig的UI使用react重写快完成了.上次搞定了基于jwt的登录模式(AntDesign Pro + .NET Core 实现基于JWT的登录认证),但是还有点问题.现在使用react重写后,agileconfig成了个确确实实的前后端分离项目.那么其实部署的话要分2个站点部署,把前端build完的静态内容部署在一个网站,把server端也部署在一个站点.然后修改前端的baseURL让spa的api

  • asp.net core集成MongoDB的完整步骤

    一.前言及MongoDB的介绍 最近在整合自己的框架,顺便把MongoDBD的最简单CRUD重构一下作为组件化集成到asp.net core项目中,当然此篇文章中没有讲解mongodb的集群部署,等有机会分享一下. 首先,我们在MongoDB的官方文档中看到,MongoDb的2.4以上的For .Net的驱动是支持.Net Core 2.0的. 针对MongoDB,我想大家应该不陌生,没有用过也有听过. 1.mongodb是什么? MongoDB是一个基于分布式文件存储的数据库,为web应用提供

  • Springboot集成Kafka实现producer和consumer的示例代码

    本文介绍如何在springboot项目中集成kafka收发message. Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性: 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能.高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息.支持通过Kafka服务器和消费机集群来分区消息.支持Hadoop并行数据加载. 安装Kafka 因为安装kafka需要zookeeper的支持,所以Windows安装时需要将zookee

  • springboot 1.5.2 集成kafka的简单例子

    本文介绍了springboot 1.5.2 集成kafka的简单例子 ,分享给大家,具体如下: 随着spring boot 1.5版本的发布,在spring项目中与kafka集成更为简便. 添加依赖 compile("org.springframework.kafka:spring-kafka:1.1.2.RELEASE") 添加application.properties #kafka # 指定kafka 代理地址,可以多个 spring.kafka.bootstrap-server

  • Spring Boot集成Kafka的示例代码

    本文介绍了Spring Boot集成Kafka的示例代码,分享给大家,也给自己留个笔记 系统环境 使用远程服务器上搭建的kafka服务 Ubuntu 16.04 LTS kafka_2.12-0.11.0.0.tgz zookeeper-3.5.2-alpha.tar.gz 集成过程 1.创建spring boot工程,添加相关依赖: <?xml version="1.0" encoding="UTF-8"?> <project xmlns=&qu

  • nginx lua集成kafka的实现方法

    第一步:进入opresty目录 [root@node03 openresty]# cd /export/servers/openresty/ [root@node03 openresty]# ll total 356 drwxr-xr-x 2 root root 4096 Jul 26 11:33 bin drwxrwxr-x 44 1000 1000 4096 Jul 26 11:31 build drwxrwxr-x 43 1000 1000 4096 Nov 13 2017 bundle

  • Spring纯Java配置集成kafka代码实例

    这篇文章主要介绍了Spring纯Java配置集成kafka代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 KafkaConfig.java package com.niugang.config; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache

随机推荐