Springboot微服务项目整合Kafka实现文章上下架功能

目录
  • 前言:
  • 一:Kafka消息发送快速入门
    • 1.传递字符串消息
      • (1)发送消息
      • (2)监听消息
      • (3)测试结果
    • 2.传递对象消息
      • (1)修改生产者代码
      • (2)结果测试
  • 二:功能引入
    • 1.需求分析
    • 2.逻辑分析
  • 三:前期准备
    • 1.引入依赖
    • 2.定义常量
    • 3.Kafka配置信息
  • 四:代码实现
    • 1.自媒体端
    • 2.移动端

前言:

1.前面基于Springboot的单体项目介绍已经完结了,至于项目中的其他功能实现我这里就不打算介绍了,因为涉及的知识点不难,而且都是简单的CRUD操作,假如有兴趣的话可以私信我我再看看要不要写几篇文章做个介绍。

2.完成上一阶段的学习,我就投入到了微服务的学习当中,所用教程为B站上面黑马的微服务教程。由于我的记性不是很好,所以对于新事物的学习我比较喜欢做笔记以加强理解,在这里我会将笔记的重点内容做个总结发布到“微服务学习”笔记栏目中。我是赵四,一名有追求的程序员,希望大家能多多支持,能给我点个关注就更好了。

一:Kafka消息发送快速入门

1.传递字符串消息

(1)发送消息

创建一个Controller包并编写一个测试类用于发送消息

package com.my.kafka.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class HelloController {
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @GetMapping("hello")
    public String helloProducer(){
        kafkaTemplate.send("my-topic","Hello~");
        return "ok";
    }
}

(2)监听消息

编写测试类用于接收消息:

package com.my.kafka.listener;

import org.junit.platform.commons.util.StringUtils;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class HelloListener {
    @KafkaListener(topics = "my-topic")
    public void helloListener(String message) {
        if(StringUtils.isNotBlank(message)) {
            System.out.println(message);
        }
    }
}

(3)测试结果

打开浏览器输入localhost:9991/hello,然后到控制台查看消息,可以看到成功消息监听到并且进行了消费。

2.传递对象消息

目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有两种方式:

方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强,这里不做介绍。

方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可,本项目采用这种方式。

(1)修改生产者代码

@GetMapping("hello")
public String helloProducer(){
    User user = new User();
    user.setName("赵四");
    user.setAge(20);
    kafkaTemplate.send("my-topic", JSON.toJSONString(user));
    return "ok";
}

(2)结果测试

可以看到成功接收都对象参数,后期要使用该对象只需要将其转换成User对象即可。

二:功能引入

1.需求分析

发布文章之后,可能会由于文章出现某些错误或者其他原因,我们会在文章管理端实现文章的上下架功能(见下图),也即当管理端实现对文章下架之后移动端将不会再展示该文章,只有该文章重新被上架之后才能在移动端看到该文章信息。

2.逻辑分析

后端接收到前端传过来的参数之后要先做一个校验,参数不为空才能继续往下执行,首先应该根据前端传过来的文章id(自媒体端文章id)查询自媒体数据库的文章信息并判断该文章是否已是发布状态,因为只有审核成功并成功发布了的文章才能进行上下架操作。自媒体端微服务对文章上下架状态进行修改之后便可以向Kafka发送一条消息,该消息为Map对象,里面存储的数据为移动端的文章id以及前端传过来的上下架参数enable,当然要将该Map对象转换成JSON字符串才能进行发送。

文章微服务监听到Kafka发送过来的消息之后将JSON字符串转换成Map对象之后再获取相关参数对移动端文章的上下架状态进行修改。

三:前期准备

1.引入依赖

<!-- kafkfa -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>

2.定义常量

package com.my.common.constans;
public class WmNewsMessageConstants {
    public static final String WM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic";
}

3.Kafka配置信息

由于我是用Nacos来作为注册中心,所以配置信息放置在Nacos上面即可。

(1)自媒体端配置

spring:
  kafka:
    bootstrap-servers: 4.234.52.122:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

(2)移动端配置

spring:
  kafka:
    bootstrap-servers: 4.234.52.122:9092
    consumer:
      group-id: ${spring.application.name}-test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

四:代码实现

1.自媒体端

@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
/**
 * 文章下架或上架
 * @param id
 * @param enable
 * @return
 */
@Override
public ResponseResult downOrUp(Integer id,Integer enable) {
    log.info("执行文章上下架操作...");
    if(id == null || enable == null) {
        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
    }
    //根据id获取文章
    WmNews news = getById(id);
    if(news == null) {
        return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章信息不存在");
    }
    //获取当前文章状态
    Short status = news.getStatus();
    if(!status.equals(WmNews.Status.PUBLISHED.getCode())) {
        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"文章非发布状态,不能上下架");
    }

    //更改文章状态
    news.setEnable(enable.shortValue());
    updateById(news);
    log.info("更改文章上架状态{}-->{}",status,news.getEnable());

    //发送消息到Kafka
    Map<String, Object> map = new HashMap<>();
    map.put("articleId",news.getArticleId());
    map.put("enable",enable.shortValue());
    kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map));
    log.info("发送消息到Kafka...");

    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}

2.移动端

(1)设置监听器

package com.my.article.listener;

import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.my.article.service.ApArticleService;
import com.my.common.constans.WmNewsMessageConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.kafka.annotation.KafkaListener;

@Slf4j
@Component
public class EnableListener {
    @Autowired
    private ApArticleService apArticleService;

    @KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
    public void downOrUp(String message) {
        if(StringUtils.isNotBlank(message)) {
            log.info("监听到消息{}",message);
            apArticleService.downOrUp(message);
        }
    }
}

(2)获取消息并修改文章状态

/**
* 文章上下架
* @param message
* @return
*/
@Override
public ResponseResult downOrUp(String message) {
    Map map = JSON.parseObject(message, Map.class);
    //获取文章id
    Long articleId = (Long) map.get("articleId");
    //获取文章待修改状态
    Integer enable = (Integer) map.get("enable");
    //查询文章配置
    ApArticleConfig apArticleConfig = apArticleConfigMapper.selectOne
            (Wrappers.<ApArticleConfig>lambdaQuery().eq(ApArticleConfig::getArticleId, articleId));
    if(apArticleConfig != null) {
        //上架
        if(enable == 1) {
            log.info("文章重新上架");
            apArticleConfig.setIsDown(false);
            apArticleConfigMapper.updateById(apArticleConfig);
        }
        //下架
        if(enable == 0) {
            log.info("文章下架");
            apArticleConfig.setIsDown(true);
            apArticleConfigMapper.updateById(apArticleConfig);
        }
    }
    else {
        throw new RuntimeException("文章信息不存在");
    }
    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}

到此这篇关于Springboot微服务项目整合Kafka实现文章上下架功能的文章就介绍到这了,更多相关Springboot整合Kafka内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Springboot集成Kafka进行批量消费及踩坑点

    目录 引入依赖 创建配置类 Kafka 消费者 引入依赖 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.11.RELEASE</version> </dependency> 因为我的项目的 springboot 版本是 1.5.22.RELE

  • SpringBoot+Nacos+Kafka微服务流编排的简单实现

    目录 前言 准备工作 Nacos安装及使用入门 准备三个SpringBoot服务,引入Nacos及Kafka 业务解读 Nacos配置 创建配置 读取配置 监听配置改变 总结 前言 最近一直在做微服务开发,涉及了一些数据处理模块的开发,每个处理业务都会开发独立的微服务,便于后面拓展和流编排,学习了SpringCloud Data Flow等框架,感觉这个框架对于我们来说太重了,维护起来也比较麻烦,于是根据流编排的思想,基于我们目前的技术栈实现简单的流编排功能. 简单的说,我们希望自己的流编排就是

  • Springboot集成Kafka实现producer和consumer的示例代码

    本文介绍如何在springboot项目中集成kafka收发message. Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性: 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能.高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息.支持通过Kafka服务器和消费机集群来分区消息.支持Hadoop并行数据加载. 安装Kafka 因为安装kafka需要zookeeper的支持,所以Windows安装时需要将zookee

  • SpringBoot整合kafka遇到的版本不对应问题及解决

    目录 SpringBoot整合kafka遇到版本不对应 如果你的SpringBoot是2.0.3版本 如果你的SpringBoot比较新,用的2.1.0版本 SpringBoot整合kafka问题的注意事项 SpringBoot整合kafka遇到版本不对应 SpringBoot 整合 kafka 需要在SpringBoot项目里增加kafka的jar,而最为关键的一点是版本要对应好. 如果你的SpringBoot是2.0.3版本     <parent>        <groupId&

  • Springboot整合kafka的示例代码

    目录 1. 整合kafka 2. 消息发送 2.1 发送类型 2.2 序列化 2.3 分区策略 3. 消息消费 3.1 消息组别 3.2 位移提交 1. 整合kafka 1.引入依赖 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> 2.设置yml文件 spring:

  • Springboot微服务项目整合Kafka实现文章上下架功能

    目录 前言: 一:Kafka消息发送快速入门 1.传递字符串消息 (1)发送消息 (2)监听消息 (3)测试结果 2.传递对象消息 (1)修改生产者代码 (2)结果测试 二:功能引入 1.需求分析 2.逻辑分析 三:前期准备 1.引入依赖 2.定义常量 3.Kafka配置信息 四:代码实现 1.自媒体端 2.移动端 前言: 1.前面基于Springboot的单体项目介绍已经完结了,至于项目中的其他功能实现我这里就不打算介绍了,因为涉及的知识点不难,而且都是简单的CRUD操作,假如有兴趣的话可以私

  • springboot微服务Lucence实现Mysql全文检索功能

    目录 一.前言 1.1 常规调优手段 1.1.1 加索引 1.1.2 代码层优化 1.1.3 减少关联表查询 1.1.4 分库分表 1.1.5 引入第三方存储 二.一个棘手的问题 2.1 前置准备 2.1.1 创建一张表 2.1.2 插入一些数据 2.2 问题引发 2.2.1 关键字模糊查询 2.2.2 执行计划分析 2.2.3 需求引出 三.lucence与全文检索 3.1 Lucene概念 3.2 全文检索 3.3 Lucene 建立索引的过程 四.基于Lucence解决方案 4.1 需求分

  • SpringCloud基于RestTemplate微服务项目案例解析

    目录 基于RestTemplate微服务项目 一.构建父工程 二.构建serverspringcloud-api(公共子模块) 三.创建部门微服务提供者 四.创建部门微服务消费者 五.总结 基于RestTemplate微服务项目 在写SpringCloud搭建微服务之前,我想先搭建一个不通过springcloud只通过SpringBoot和Mybatis进行模块之间额通讯.然后在此基础上再添加SpringCloud框架. 下面先对案例做个说明 该项目有一个maven父模块,其中里面有三个子模块:

  • 如何利用IDEA搭建SpringBoot项目整合mybatis实现简单的登录功能

    利用闲余时间想自己搭建一个springboot+mybatis的项目,提升一下自己对项目的了解,毕竟自己还是一个小白,在这里为自己创建项目的过程做一个记录,以便以后回忆.同时将搭建中遇到的问题也在这里做记录.如有遇到同样问题的同学,希望能给你一下借鉴. springboot的优势之一就是快速搭建项目,省去了自己导入jar包和配置xml的时间,使用非常方便. 一,搭建项目: 1.打开IDEA,点击File→New→Project...,如图1所示 图1  搭建项目 2.当我们选择project..

  • 基于Pinpoint对SpringCloud微服务项目实现全链路监控的问题

    目录 1.全链路监控的概念 2.pinpoint链路监控组件的介绍 3.使用docker部署pinpoint监控组件 4.在微服务中集成pinpoint-agent 4.1.pinpoint-agent的接入方式 4.2.配置pinpoint-agent 4.3.修改每个微服务程序的Dockerfile接入pinpoint-agent 4.4.先将product商品服务接入到pinpoint观察效果 4.5.将所有的微服务接入到pinpoint系统 5.pinpoint监控系统简单使用 5.1.

  • Docker Compose部署微服务项目上线功能

    目录 一.需求说明 二.效果图 三.项目结构 四.核心源码 ️Java依赖与接口 ️Docker相关源码 五.部署项目 小结 一.需求说明 编写一个SpringBoot + Redis 的微服务项目,并提供 hello接口,每访问一次接口,计数器+1 二.效果图 三.项目结构 目录说明 docker-compose.yml :项目的启动文件,配置编排等 Dockerfile:项目上线所需要的依赖,以及启动方式 四.核心源码 ️Java依赖与接口 依赖文件 pom.xml <?xml versio

  • docker compose运行微服务项目的方法

    目录 1.数据库迁移 2.阅读docker-compose.yml文件 3.将项目打包 1.数据库迁移 将cloud-demo涉及的相关sql导入到Linux上的mysql容器中 2.阅读docker-compose.yml文件 version: "3.2" services: nacos: image: nacos/nacos-server:1.4.1 environment: MODE: standalone ports: - "8848:8848" mysql

  • 浅谈Spring Boot 微服务项目的推荐部署方式

    如果开发过spring boot的程序,应该都知道,使用spring boot官方的maven打包插件(spring-boot-maven-plugin) 来打包,打出来的jar包一般有40M以上. 如果公司的服务器上传带宽不高,那么手动上传一个jar或者jenkins部署一次jar,都是非常痛苦的........ 但是,如果打包的时候不引入lib,那么打出来的jar包一般只有几十k而已,非常小,想怎么传就怎么传......... 本文会提供一个bash启动脚本,只需要稍做更改,即可适应你的程序

  • 浅析SpringBoot微服务中异步调用数据提交数据库的问题

    目录 前言: 一: 同步&异步 1.同步与异步的概念 2.同步方法调用&异步方法调用 2.1:同步方法调用 2.2:异步方法调用 二:问题引入 1.功能需求 2.问题引出 3.问题剖析 三:问题解决 前言: 1.前面基于Springboot的单体项目介绍已经完结了,至于项目中的其他功能实现我这里就不打算介绍了,因为涉及的知识点不难,而且都是简单的CRUD操作,假如有兴趣的话可以私信我我再看看要不要写几篇文章做个介绍. 2.完成上一阶段的学习,我就投入到了微服务的学习当中,所用教程为B站上面

  • springboot+springmvc+mybatis项目整合

    介绍: 上篇给大家介绍了ssm多模块项目的搭建,在搭建过程中spring整合springmvc和mybatis时会有很多的东西需要我们进行配置,这样不仅浪费了时间,也比较容易出错,由于这样问题的产生,Pivotal团队提供了一款全新的框架,该框架使用了特定的方式来进行配置,从而使开发人员不再需要定义样板化的配置.通过这种方式,Spring Boot致力于在蓬勃发展的快速应用开发领域(rapid application development)成为领导者. 特点: 1. 创建独立的Spring应用

随机推荐