Spring Boot整合阿里开源中间件Canal实现数据增量同步

目录
  • 前言
  • Canal是什么?
  • Canal数据如何传输?
  • 数据同步还有其他中间件吗?
  • Canal服务端安装
    • 1、打开MySQL的binlog日志
    • 2、设置MySQL的配置
    • 3、设置RabbitMQ的配置
    • 4、RabbitMQ新建exchange和Queue
    • 5、启动服务端
    • 6、测试
  • Canal客户端搭建
    • 1、创建消息实体类
    • 2、MQ消息监听业务
    • 3、测试
  • 总结

前言

数据同步一直是一个令人头疼的问题。在业务量小,场景不多,数据量不大的情况下我们可能会选择在项目中直接写一些定时任务手动处理数据,例如从多个表将数据查出来,再汇总处理,再插入到相应的地方。

但是随着业务量增大,数据量变多以及各种复杂场景下的分库分表的实现,使数据同步变得越来越困难。

今天这篇文章使用阿里开源的中间件Canal解决数据增量同步的痛点。

文章目录如下:

Canal是什么?

canal译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

从这句话理解到了什么?

基于MySQL,并且通过MySQL日志进行的增量解析,这也就意味着对原有的业务代码完全是无侵入性的。

工作原理:解析MySQL的binlog日志,提供增量数据。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

官方文档:github.com/alibaba/can…

Canal数据如何传输?

先来一张官方图:

Canal分为服务端和客户端,这也是阿里常用的套路,比如前面讲到的注册中心Nacos:

  • 服务端:负责解析MySQL的binlog日志,传递增量数据给客户端或者消息中间件
  • 客户端:负责解析服务端传过来的数据,然后定制自己的业务处理。

目前为止支持的消息中间件很全面了,比如Kafka、RocketMQ,RabbitMQ。

数据同步还有其他中间件吗?

有,当然有,还有一些开源的中间件也是相当不错的,比如Bifrost。

常见的几款中间件的区别如下:

当然要我选择的话,首选阿里的中间件Canal。

Canal服务端安装

服务端需要下载压缩包,下载地址:github.com/alibaba/can…

目前最新的是v1.1.5,点击下载:

下载完成解压,目录如下:

本文使用Canal+RabbitMQ进行数据的同步,因此下面步骤完全按照这个base进行。

1、打开MySQL的binlog日志

修改MySQL的日志文件,my.cnf 配置如下:

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

2、设置MySQL的配置

需要设置服务端配置文件中的MySQL配置,这样Canal才能知道需要监听哪个库、哪个表的日志文件。

一个 Server 可以配置多个实例监听 ,Canal 功能默认自带的有个 example 实例,本篇就用 example 实例 。如果增加实例,复制 example 文件夹内容到同级目录下,然后在 canal.properties 指定添加实例的名称。

修改canal.deployer-1.1.5\conf\example\instance.properties配置文件

# url
canal.instance.master.address=127.0.0.1:3306
# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=root
# 监听的数据库
canal.instance.defaultDatabaseName=test
# 监听的表,可以指定,多个用逗号分割,这里正则是监听所有
canal.instance.filter.regex=.*\\..*

3、设置RabbitMQ的配置

服务端默认的传输方式是tcp,需要在配置文件中设置MQ的相关信息。

这里需要修改两处配置文件,如下;

1、canal.deployer-1.1.5\conf\canal.properties

这个配置文件主要是设置MQ相关的配置,比如URL,用户名、密码...

# 传输方式:tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rabbitMQ
##################################################
######### 		    RabbitMQ	     #############
##################################################
rabbitmq.host = 127.0.0.1
rabbitmq.virtual.host =/
# exchange
rabbitmq.exchange =canal.exchange
# 用户名、密码
rabbitmq.username =guest
rabbitmq.password =guest
## 是否持久化
rabbitmq.deliveryMode = 2

2、canal.deployer-1.1.5\conf\example\instance.properties

这个文件设置MQ的路由KEY,这样才能路由到指定的队列中,如下:

canal.mq.topic=canal.routing.key

4、RabbitMQ新建exchange和Queue

在RabbitMQ中需要新建一个canal.exchange(必须和配置中的相同)的exchange和一个名称为 canal.queue(名称随意)的队列。

其中绑定的路由KEY为:canal.routing.key(必须和配置中的相同),如下图:

5、启动服务端

点击bin目录下的脚本,windows直接双击startup.bat,启动成功如下:

6、测试

在本地数据库test中的oauth_client_details插入一条数据,如下:

INSERT INTO `oauth_client_details` VALUES ('myjszl', 'res1', '$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W', 'all', 'password,refresh_token,authorization_code,client_credentials,implicit', 'http://www.baidu.com', NULL, 1000, 1000, NULL, 'false');

此时查看MQ中的canal.queue已经有了数据,如下:

其实就是一串JSON数据,这个JSON如下:

{
	"data": [{
		"client_id": "myjszl",
		"resource_ids": "res1",
		"client_secret": "$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W",
		"scope": "all",
		"authorized_grant_types": "password,refresh_token,authorization_code,client_credentials,implicit",
		"web_server_redirect_uri": "http://www.baidu.com",
		"authorities": null,
		"access_token_validity": "1000",
		"refresh_token_validity": "1000",
		"additional_information": null,
		"autoapprove": "false"
	}],
	"database": "test",
	"es": 1640337532000,
	"id": 7,
	"isDdl": false,
	"mysqlType": {
		"client_id": "varchar(48)",
		"resource_ids": "varchar(256)",
		"client_secret": "varchar(256)",
		"scope": "varchar(256)",
		"authorized_grant_types": "varchar(256)",
		"web_server_redirect_uri": "varchar(256)",
		"authorities": "varchar(256)",
		"access_token_validity": "int(11)",
		"refresh_token_validity": "int(11)",
		"additional_information": "varchar(4096)",
		"autoapprove": "varchar(256)"
	},
	"old": null,
	"pkNames": ["client_id"],
	"sql": "",
	"sqlType": {
		"client_id": 12,
		"resource_ids": 12,
		"client_secret": 12,
		"scope": 12,
		"authorized_grant_types": 12,
		"web_server_redirect_uri": 12,
		"authorities": 12,
		"access_token_validity": 4,
		"refresh_token_validity": 4,
		"additional_information": 12,
		"autoapprove": 12
	},
	"table": "oauth_client_details",
	"ts": 1640337532520,
	"type": "INSERT"
}

每个字段的意思已经很清楚了,有表名称、方法、参数、参数类型、参数值.....

客户端要做的就是监听MQ获取JSON数据,然后将其解析出来,处理自己的业务逻辑。

Canal客户端搭建

客户端很简单实现,要做的就是消费Canal服务端传递过来的消息,监听canal.queue这个队列。

1、创建消息实体类

MQ传递过来的是JSON数据,当然要创建个实体类接收数据,如下:

/**
 * @author 公号 码猿技术专栏
 * Canal消息接收实体类
 */
@NoArgsConstructor
@Data
public class CanalMessage<T> {
    @JsonProperty("type")
    private String type;
    @JsonProperty("table")
    private String table;
    @JsonProperty("data")
    private List<T> data;
    @JsonProperty("database")
    private String database;
    @JsonProperty("es")
    private Long es;
    @JsonProperty("id")
    private Integer id;
    @JsonProperty("isDdl")
    private Boolean isDdl;
    @JsonProperty("old")
    private List<T> old;
    @JsonProperty("pkNames")
    private List<String> pkNames;
    @JsonProperty("sql")
    private String sql;
    @JsonProperty("ts")
    private Long ts;
}

2、MQ消息监听业务

接下来就是监听队列,一旦有Canal服务端有数据推送能够及时的消费。

代码很简单,只是给出个接收的案例,具体的业务逻辑可以根据业务实现,如下:

import cn.hutool.json.JSONUtil;
import cn.myjszl.middle.ware.canal.mq.rabbit.model.CanalMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
 * 监听MQ获取Canal增量的数据消息
 */
@Component
@Slf4j
@RequiredArgsConstructor
public class CanalRabbitMQListener {
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "canal.queue", durable = "true"),
                    exchange = @Exchange(value = "canal.exchange"),
                    key = "canal.routing.key"
            )
    })
    public void handleDataChange(String message) {
        //将message转换为CanalMessage
        CanalMessage canalMessage = JSONUtil.toBean(message, CanalMessage.class);
        String tableName = canalMessage.getTable();
        log.info("Canal 监听 {} 发生变化;明细:{}", tableName, message);
        //TODO 业务逻辑自己完善...............
    }
}

3、测试

下面向表中插入数据,看下接收的消息是什么样的,SQL如下:

INSERT INTO `oauth_client_details`
VALUES
	( 'myjszl', 'res1', '$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W', 'all', 'password,refresh_token,authorization_code,client_credentials,implicit', 'http://www.baidu.com', NULL, 1000, 1000, NULL, 'false' );

客户端转换后的消息如下图:

上图可以看出所有的数据都已经成功接收到,只需要根据数据完善自己的业务逻辑即可。

源码地址:https://github.com/chenjiabing666/JavaFamily/tree/master/spring-security/middleware/canal-mq-boot

总结

数据增量同步的开源工具并不只有Canal一种,根据自己的业务需要选择合适的组件。

以上就是Spring Boot整合阿里开源中间件Canal实现数据增量同步的详细内容,更多关于Spring Boot整合Canal数据同步的资料请关注我们其它相关文章!

(0)

相关推荐

  • SpringBoot整合canal实现数据同步的示例代码

    目录 一.前言 二.docker-compose部署canal 三.canal-admin可视化管理 四.springboot整合canal实现数据同步 五.canal-spring-boot-starter 一.前言 canal:阿里巴巴 MySQL binlog 增量订阅&消费组件https://github.com/alibaba/canal tips: 环境要求和配置参考 https://github.com/alibaba/canal/wiki/AdminGuide 这里额外提下Red

  • Springboot2.3.x整合Canal的示例代码

    目录 一.故事背景 二.什么是Canal 三.Canal安装 (1)事前准备 (1)数据库开启binlog (2)数据库新建账号,开启MySQLslav权限 (2)CanalAdmin安装 (3)CanalServer安装 (4)springbootdemo示例 一.故事背景 前言… 最近工作中遇到了一个数据同步的问题 我们这边系统的一个子业务需要依赖另一个系统的数据,当另一个系统数据变更时,我们这边的数据库要对数据进行同步… 那么我自己想到的同步方式呢就两种: 1.MQ订阅,另一个系统数据变更

  • springboot整合freemarker的踩坑及解决

    目录 springboot整合freemarker踩坑 报错 问题原因 解决方法 springboot freemarker基础配置及使用 1.基础配置 2.基础使用 springboot整合freemarker踩坑 报错 2021-04-23 02:01:18.148 ERROR 9484 --- [nio-8080-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatc

  • 关于SpringBoot整合Canal数据同步的问题

    目录 1.CentOS7编译安装MySQL5.7.24 2.Mysql设置binLog配置 3.Linux下载安装Canal服务 4.Boot项目中引入依赖 5.修改properties配置文件 6.修改Application启动类 7.创建Canal配置类自动监听 1.CentOS7编译安装MySQL5.7.24 CentOS7编译安装MySQL5.7.24的教程详解 链接地址:https://www.jb51.net/article/152246.htm 2.Mysql设置binLog配置

  • springboot 整合canal实现示例解析

    目录 前言 环境准备 一.快速搭建canal服务 搭建步骤 1.服务器使用docker快速安装一个mysql并开启binlog日志 2.上传canal安装包并解压 3.进入到第二步解压后的文件目录,并修改配置文件 4.启动canal服务 二.与springboot整合 1.Java中使用canal 2.编写一个demo 3.与springboot整合 4.application.yml 配置文件 5.核心工具类 6.提供一个配置类,在程序启动后监听数据变化 7.启动类 前言 在Mysql到Ela

  • Springboot整合企业微信机器人助手推送消息的实现

    目录 前言 本篇内容: 正文 机器人创建步骤: 前言 这个东西有啥用,好玩? 确实, 好玩归好玩,其实很有使用场景. 可以自己选则一些业务节点触发这个机器人助手的消息推送: 简单举例: 1. 有人给你的系统留下反馈意见了,推送到运营群去: 2.项目部署成功了,推送到运维群去: 3.有人新增业务资料了,推送到客服群去: 本篇内容: 对接企微机器人,推送消息到群聊. 消息类型有四种: 文本消息 图片消息 MarkDown格式文本消息 小卡片消息(小卡片哦~) 效果: 正文 注意点: 1.企业微信群聊

  • 使用VUE+SpringBoot+EasyExcel 整合导入导出数据的教程详解

    目录 1 前端 2 数据库 3 后端 3.1 contrller 3.2 mapper 3.3 bean 3.4 listener 3.5 config 3.6 配置文件 4 启动测试 创建一个普通的maven项目即可 项目目录结构 1 前端 存放在resources/static 下 index.html <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8&quo

  • Spring Boot整合阿里开源中间件Canal实现数据增量同步

    目录 前言 Canal是什么? Canal数据如何传输? 数据同步还有其他中间件吗? Canal服务端安装 1.打开MySQL的binlog日志 2.设置MySQL的配置 3.设置RabbitMQ的配置 4.RabbitMQ新建exchange和Queue 5.启动服务端 6.测试 Canal客户端搭建 1.创建消息实体类 2.MQ消息监听业务 3.测试 总结 前言 数据同步一直是一个令人头疼的问题.在业务量小,场景不多,数据量不大的情况下我们可能会选择在项目中直接写一些定时任务手动处理数据,例

  • Spring Boot 整合 Fisco Bcos的案例分析(区块链)

    目录 简介 本地环境 主要流程: 1.Fisco Bcos环境搭建与验证 1.1.搭建单群组4节点联盟链: 1.2.检查证书 1.3.使用证书验证节点正确性 2.创建SpringBoot工程并配置依赖 2.1.创建SpringBoot工程: 2.2.配置pom.xml 3.2.配置节点证书: 3.3.编写controller 3.4.DemoBcosApplication默认不做修改 3.5.application.properties什么也没配置 4.生成jar包.部署服务器验证 4.1.本地

  • Spring Boot整合Elasticsearch实现全文搜索引擎案例解析

    简单说,ElasticSearch(简称 ES)是搜索引擎,是结构化数据的分布式搜索引擎.Elastic Search是一个开源的,分布式,实时搜索和分析引擎.Spring Boot为Elasticsearch及Spring Data Elasticsearch提供的基于它的抽象提供了基本的配置.Spring Boot提供了一个用于聚集依赖的spring-boot-starter-data-elasticsearch 'StarterPOM'. 引入spring-boot-starter-dat

  • spring boot整合mybatis利用Mysql实现主键UUID的方法

    前言 本文主要给大家介绍了关于spring boot整合mybatis利用Mysql实现主键UUID的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧. 实现 基础项目的pom.xml部分代码如下 <properties> <java.version>1.8</java.version> </properties> <!-- Inherit defaults from Spring Boot --> <parent&

  • Spring Boot 整合 Druid过程解析

    这篇文章主要介绍了Spring Boot 整合 Druid过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 概述 Druid 是阿里巴巴开源平台上的一个项目,整个项目由数据库连接池.插件框架和 SQL 解析器组成.该项目主要是为了扩展 JDBC 的一些限制,可以让程序员实现一些特殊的需求,比如向密钥服务请求凭证.统计 SQL 信息.SQL 性能收集.SQL 注入检查.SQL 翻译等,程序员可以通过定制来实现自己需要的功能. Druid 是

  • Spring Boot整合EhCache的步骤详解

    本文讲解Spring Boot与EhCache的整合. 1 EhCache简介 EhCache 是一个纯Java的进程内缓存框架,具有快速.精干等特点,是Hibernate中默认CacheProvider.Ehcache是一种广泛使用的开源Java分布式缓存.主要面向通用缓存,Java EE和轻量级容器.它具有内存和磁盘存储,缓存加载器,缓存扩展,缓存异常处理程序,一个gzip缓存servlet过滤器,支持REST和SOAP api等特点. 2 Spring Boot整合EhCache步骤 2.

  • Spring Boot整合QueryDSL的实现示例

    之前研究Jooq,今天来研究一下搭配JPA的QueryDSL吧. 简介 Querydsl是一个Java开源框架用于构建类型安全的SQL查询语句.它采用API代替拼凑字符串来构造查询语句.可跟 Hibernate 和 JPA 等框架结合使用. 新建Spring Boot项目 ...还说啥? 1. pom.xml <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <

  • Spring Boot 整合 Fisco Bcos部署、调用区块链合约的案例

    本篇文章介绍 Spring Boot 整合 Fisco Bcos 的相关技术(合约的调用) 简介 在上一节,介绍了Spring Boot 整合 Fisco BCOS的最最基础的案例(SpringBoot成功连接Fisco BCOS,并访问其节点网络 --> 文章链接). 本节,咱们继续介绍Spring Boot 整合 Fisco BCOS的最最重要的技术点:部署.调用区块链合约(基于上一节的工程案例). 主要流程 --> 1.合约开发(省略,因为是案例演示,直接使用 Fisco BCOS自带的

  • spring boot整合log4j2及MQ消费处理系统日志示例

    目录 前言 1.添加相关jar依赖 2.系统log4j2.xml配置 3.添加处理日志的消息监听 前言 当系统的并发比较高的时候,日志的处理输出也是一种性能的开销负担,所以,选择一个中间件来处理消费日志必不可少! 下面是spring boot整合log4j2结合spring amqp来消费处理系统日志的实例,只需要简单的三步 1.添加相关jar依赖 <dependency> <groupId>org.springframework.boot</groupId> <

  • Spring Boot 整合 FreeMarker 实例分享

    目录 一.前言 二.FreeMarker 简介 三.准备工作 环境准备 添加 FreeMarker 依赖 添加 FreeMarker 相关配置 四.编写实体类和 Controller 编写实体类 编写 Controller 数据渲染 五.总结 一.前言 在之前的文章Spring Boot 整合 Thymeleaf 实例分享中,我们学习了如何将模板 Thymeleaf 整合到 Spring Boot 中,那今天我们就来看看,另一个老牌的开源免费模板引擎 - FreeMarker! 二.FreeMa

随机推荐