云服务器(Linux)安装部署Kafka的详细过程

目录
  • 云服务器(Linux)安装部署Kafka
    • 前期准备
    • 下载安装包
    • 上载安装包到云服务器
    • 配置kafka
    • 开放云服务器端口
    • 开放linux防火墙端口
    • 启动kafka服务
    • 测试单机连通性
    • Springboot连接kafak

云服务器(Linux)安装部署Kafka

前期准备

kafka的安装需要依赖于jdk,需要在服务器上提前安装好该环境,这里使用用jdk1.8。

下载安装包

官网地址:

较新的版本已自带Zookeeper,无需额外下载。这里使用3.2.0做演示。

注意要下载Binary downloads标签下的tgz包,Source download标签下的包为源码。无法直接运行,需要编译。

上载安装包到云服务器

使用ssh连接工具将kafka_2.12-3.2.0.tgz这个包上传到云服务器上的一个目录。

打开命令行,进入到放有压缩包的目录,执行

tar -zxvf kafka_2.12-3.2.0.tgz

配置kafka

然后使用cd命令进入到/kafka_2.12-3.2.0/config/下,使用

vi server.properties

编辑配置文件。

删除listeners和advertised前方的#号,改成如下配置:

listeners=PLAINTEXT://云服务器内网ip:9092(本地访问用本地ip)
# 如果要提供外网访问则必须配置此项
advertised.listeners=PLAINTEXT://云服务器公网ip:9092(若要远程访问需配置此项为云服务器的公网ip)
# zookeeper连接地址,集群配置格式为ip:port,ip:port,ip:port
zookeeper.connect=云服务器公网ip:2181

开放云服务器端口

在云服务器控制台内进入安全组页面,添加两条新的入站规则,tcp/9092和tcp/2181

开放linux防火墙端口

先查看使用的防火墙类型iptables/firewalld

iptables操作命令

1.打开/关闭/重启防火墙

开启防火墙(重启后永久生效):chkconfig iptables on

关闭防火墙(重启后永久生效):chkconfig iptables off

开启防火墙(即时生效,重启后失效):service iptables start

关闭防火墙(即时生效,重启后失效):service iptables stop

重启防火墙:service iptables restartd

2.查看打开的端口

/etc/init.d/iptables status
3.开启端口

iptables -A INPUT -p tcp --dport 8080 -j ACCEPT
4.保存并重启防火墙
/etc/rc.d/init.d/iptables save
/etc/init.d/iptables restart

Centos7默认安装了firewalld,如果没有安装的话,可以使用 yum install firewalld firewalld-config进行安装。

操作指令如下:

1.启动防火墙

systemctl start firewalld
2.禁用防火墙

systemctl stop firewalld
3.设置开机启动

systemctl enable firewalld
4.停止并禁用开机启动

sytemctl disable firewalld
5.重启防火墙

firewall-cmd --reload

6.查看状态

systemctl status firewalld或者 firewall-cmd --state
7.在指定区域打开端口(记得重启防火墙)

firewall-cmd --zone=public --add-port=80/tcp(永久生效再加上 --permanent)

打开tcp/9092和tcp/2181这两个端口后,重启防火墙,并查看开放的端口确实生效。

启动kafka服务

cd命令进入kafka_2.12-3.2.0目录下,执行

bin/zookeeper-server-start.sh config/zookeeper.properties

启动zookeeper,不加-daemon方便排除启动错误,新建一个shell窗口,进入该目录再执行

bin/kafka-server-start.sh config/server.properties

启动kafka,若打印日志未报错,若未出现error日志,说明启动成功。

测试单机连通性

查询kafka下所有的topic
bin/kafka-topics.sh --list --zookeeper ip:port
因为kafka使用zookeeper作为配置中心,一些topic信息需要查询该kafka对应的zookeeper
创建topic
bin/kafka-topics.sh --create --zookeeper ip:port --replication-factor 1 --partitions 1 --topic test
开启生产者
bin/kafka-console-producer.sh --broker-list cos100:9092 --topic test
开启消费者
bin/kafka-console-consumer.sh --bootstrap-server cos100:9092 --topic test

Springboot连接kafak

在pom.xml文件中引入kafka依赖

<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.2.0</version>
        </dependency>

在application.yml配置文件中配置kafka

server:
  port: 8080

spring:
  kafka:
    bootstrap-servers: 云服务器外网ip地址:9092
    producer: # 生产者
      retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
      batch-size: 16384
      buffer-memory: 33554432
      acks: 1
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: default-group
      enable-auto-commit: false
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
      # RECORD
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
      # BATCH
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
      # TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
      # COUNT
      # TIME | COUNT 有一个条件满足时提交
      # COUNT_TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
      # MANUAL
      # 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
      # MANUAL_IMMEDIATE
      ack-mode: manual_immediate

生产者

@RestController
public class KafkaController {
    private final static String TOPIC_NAME = "test-topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("/send")
    public String send(@RequestParam("msg") String msg) {
        kafkaTemplate.send(TOPIC_NAME, "key", msg);
        return String.format("消息 %s 发送成功!", msg);
    }
}

消费者

@Component
public class DemoConsumer {
    /**
     * @param record record
     * @KafkaListener(groupId = "testGroup", topicPartitions = {
     * @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
     * @TopicPartition(topic = "topic2", partitions = "0",
     * partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
     * },concurrency = "6")
     * //concurrency就是同组下的消费者个数,就是并发消费数,必须小于等于分区总数
     */
    @KafkaListener(topics = "test-topic", groupId = "testGroup1")
    public void listentestGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String value = record.value();
        System.out.println("testGroup1 message: " + value);
        System.out.println("testGroup1 record: " + record);
        //手动提交offset,一般是提交一个banch,幂等性防止重复消息
        // === 每条消费完确认性能不好!
        ack.acknowledge();
    }

    //配置多个消费组
    @KafkaListener(topics = "test--topic", groupId = "testGroup2")
    public void listentestGroup2(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String value = record.value();
        System.out.println("testGroup2 message: " + value);
        System.out.println("testGroup2 record: " + record);
        //手动提交offset
        ack.acknowledge();
    }
}

使用swagger测试发送消息

控制台打印消息

到此这篇关于云服务器(Linux)安装部署Kafka的详细过程的文章就介绍到这了,更多相关Linux安装Kafka内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Linux下Kafka单机安装配置方法(图文)

    介绍 Kafka是一个分布式的.可分区的.可复制的消息系统.它提供了普通消息系统的功能,但具有自己独特的设计.这个独特的设计是什么样的呢? 首先让我们看几个基本的消息系统术语: •Kafka将消息以topic为单位进行归纳. •将向Kafka topic发布消息的程序成为producers. •将预订topics并消费消息的程序成为consumer. •Kafka以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker. producers通过网络将消息发送到Kafka集群,集群

  • 阿里云云服务器Linux系统更新yum源Shell脚本

    所有执行的脚本都需要root身份来执行,执行方法:以root身份执行命令:bash xxx.sh 功能:自动检测系统并更新源 适用系统版本:兼容线上所有linux版本 执行方法:以root身份执行命令:bash update_source.sh 解决了什么问题:一键式检测系统并更新源 给客户带来了好处:用户只需执行该脚本一次即可自动检测系统并更新源. 备注:由于系统版本都有支持的周期所以部分源可能会出现不可用的情况,包括官方的源,这是正常情况 #!/bin/bash ##############

  • 阿里云ECS云服务器(linux系统)安装mysql后远程连接不了(踩坑)

    昨天买了一年的阿里云服务器,系统是linux Centos7的,满怀憧憬的装了个mysql,接下来的一天让我差点怀疑人生... 怎么装mysql就不多说了,反正我装了三遍,每次在阿里云上都能本地连接数据库 用navcat远程连阿里云的数据库死活连接不上.始终报:2003 - Can't connect to MySQL SERVER ON ********* (10060) 装了三次你就知道我有多绝望了,因为第一次linux下安装mysql,每次都认为可能是安装出了问题, 百度都被我翻烂了...

  • 在Linux环境下安装Kafka

    目录 二.生产与消费 2.1 kafka-topics.sh 用于管理主题 2.2 kafka-console-consumer.sh用于消费消息 2.3 kafka-console-producer.sh用于生产消息 2.4 具体操作 一.环境准备 jdk下载地址链接:下载地址 zookeeper下载地址链接:下载地址 kafka下载地址链接:下载地址 1.1 Java环境为前提 1.1.1 上传jdk-8u261-linux-x64.rpm到服务器并安装 # 安装命令 rpm -ivh jd

  • Linux一键安装web环境全攻略(阿里云服务器)

    一键安装包下载: 点此下载 安装须知 1.此安装包可在阿里云所有linux系统上部署安装,此安装包包含的软件及版本为: nginx:1.0.15.1.2.5.1.4.4 apache:2.2.22.2.4.2 mysql:5.1.73.5.5.35.5.6.15 php:5.3.18.5.4.23.5.5.7 php扩展:memcache.Zend Engine/ OPcache ftp:(yum/apt-get安装) phpwind:8.7 GBK phpmyadmin:4.1.8 2.请使用

  • 阿里云云服务器Linux系统FTP服务器搭建设置教程

    一.Redhat/CentOS安装vsftp软件 1.更新yum源 首先需要更新系统的yum源,便捷工具下载地址:http://www.jb51.net/article/54840.htm(shell脚本) 2.安装vsftp 使用yum命令安装vsftp 复制代码 代码如下: #yum install vsftpd -y 3.添加ftp帐号和目录 先检查一下nologin的位置,通常在/usr/sbin/nologin或者/sbin/nologin下. 使用下面的命令创建帐户,该命令指定了/a

  • Linux使用scp远程传输命令进行服务器文件互传(阿里云服务器)

    前几天 WordPress大学 所在的服务器环境不太稳定,CPU 占用总是 100%,折腾了好几次都没有找到原因,只好考虑重新配置环境,考虑到配置和优化好环境需要几个小时,总不能将网站停掉吧,所以要将 WordPress大学 网站临时迁移到另外的服务器运行,配置好再迁回来. 看来下阿里云的两个云服务器,都是同一个区的,所以决定通过内网进行文件传输,速度快不说(正常传输文件的速度能达到35m~50m/s),还不占用公网带宽. 阿里云云服务器有一个安全组策略,可能需要设置下 内网入.出方向的IP白名

  • 云服务器(Linux)安装部署Kafka的详细过程

    目录 云服务器(Linux)安装部署Kafka 前期准备 下载安装包 上载安装包到云服务器 配置kafka 开放云服务器端口 开放linux防火墙端口 启动kafka服务 测试单机连通性 Springboot连接kafak 云服务器(Linux)安装部署Kafka 前期准备 kafka的安装需要依赖于jdk,需要在服务器上提前安装好该环境,这里使用用jdk1.8. 下载安装包 官网地址: 较新的版本已自带Zookeeper,无需额外下载.这里使用3.2.0做演示. 注意要下载Binary dow

  • Linux阿里云服务器中安装Nginx命令的详细过程

    目录 1. 安装nginx依赖 2. wget下载nginx 3. 解压缩 4. 自动配置 5. 执行make命令 6. 配置环境变量 7. 查看nginx错误日志 1. 安装nginx依赖 yum -y install gcc zlib zlib-devel pcre-devel openssl openssl-devel 2. wget下载nginx 在新建的nginx文件夹中下载nginx,版本可以在Nginx官网自选我选择了当前时间点官网中最新最稳定的版本 wget http://ngi

  • Centos8.2云服务器环境安装Tomcat8.5的详细教程

    在安装Tomcat之前,先安装好JDK环境 可以看下我以前写的安装JDK的文章及环境配置 关于Linux服务器配置java环境遇到的问题 去Apache官网下载Tomcat8.5的安装包,https://tomcat.apache.org/ 也可以选择其他版本的 Tomcat 这里选择 Tomcat8,https://tomcat.apache.org/download-80.cgi wget 命令+复制的下载地址 wget https://mirrors.bfsu.edu.cn/apache/

  • 阿里云服务器(windows)手动部署FTP站点详细教程

    阿里云服务器(windows)手动部署FTP站点,供大家参考,具体内容如下 介绍:当我们想要将本地电脑的文件更快速的上传到我们部署的win服务器上,就需要用到FTP站点进行文件的传输 准备条件: 1.有一台实例的云服务器 ECS 2.安装有 Windows Server 2019 64位 步骤一:在远程服务器上添加IIS以及FTP服务角色 1.远程连接到服务器并成功登录到win界面 2.点击win图标,点击服务器管理器 3.点击右上角的管理以及下面的添加角色和功能 4.出现下图的模态框按数字标识

  • docker安装部署 onlyoffice的详细过程

    0. 系统要求 中央处理器 I5-10400F以上 内存 16 GB,最佳32G内存 硬盘 至少40 GB的可用空间 1:安装Docker Desktop 2:进入BIOS设置CPU的虚拟化 https://jingyan.baidu.com/article/ab0b56305f2882c15afa7dda.html 3:启动Docker desktop 报错解决:https://www.jb51.net/article/214820.htm 3:cmd进入,使用命令安装onlyoffice 安

  • Cenots7 离线安装部署PostgreSQL 的详细过程

    目录 1 PostgreSQL源码包下载并复制 1.1PostgreSQL源码包下载: 1.2复制源码包至服务器 2基于PostgreSQL源码安装 2.1解压缩源码 2.2检查环境 指定安装路径 2.3编译 2.4安装 3.postgresql的配置 3.1创建用户和组 3.2创建数据库库文件存储目录.给postgres赋予权限 3.3初始化数据库目录 3.4启动停止postgres14.5 3.4.1启动 3.4.2停止 3.4.3权限不足的解决方法 3.4.4修改管理员密码 3.5开启远程

  • 阿里云服务器部署mongodb的详细过程

    在阿里云上买了个服务器,部署mongodb遇到一些坑,解决办法也是从网上搜集而来,把零零碎碎的整理记录一下. 服务器是:Alibaba Cloud Linux 下载安装 mongodb官网下载实在是太慢,可以从阿里镜像安装:阿里MongoDb镜像 使用yum安装 在/etc/yum.repos.d目录下添加mongodb-org.repo文件 cd /etc/yum.repos.d vim mongodb-org.repo [mogodb-org] name=MongoDB Repository

  • linux安装部署ftp图片服务器的实现方法

    参考Linux中FTP服务器的搭建教程  https://www.jb51.net/article/117779.htm 今天给大家分享linux安装部署ftp图片服务器的教程,感兴趣的朋友一起看看吧! 1.安装http反向代理服务器.安装ftp文件传输组件vsftpd 详细安装及配置参见安装vsftpd.nginx 2.搭建图片服务器环境 2.1 实现的效果 例如,图片通过ftp客户端上传至服务器/home/ftpuser/www/images目录下,我想通过nginx反向代理服务器来访问ft

  • Nginx Linux安装部署详细教程

    一.Nginx简介 Nginx是一个web服务器也可以用来做负载均衡及反向代理使用,目前使用最多的就是负载均衡,具体简介我就不介绍了百度一下有很多,下面直接进入安装步骤 二.Nginx安装 1.下载Nginx及相关组件 Linux系统是Centos 6.5 64位,我直接切换到root用户下安装 进入用户目录下载程序 下载相关组件 [root@localhost src]# wget http://nginx.org/download/nginx-1.10.2.tar.gz 省略安装内容...

  • 阿里云服务器linux系统搭建Tomcat部署Web项目

    整个过程我给它分成四个步骤: 下载并安装jdk 下载并安装Tomcat 配置阿里云服务器信息 部署web项目 使用的工具:Xshell.WinSCP. 没有安装jdk的小伙伴点击上方超链接跳转到安装jdk博客 下载并安装Tomcat 到这个网页下查看最新的镜像:https://mirrors.tuna.tsinghua.edu.cn/apache/tomcat 使用工具Xshell操作Linux系统 移动到home目录下载tomcat 下载 wget https://mirrors.tuna.t

随机推荐