Mysql到Elasticsearch高效实时同步Debezium实现

目录
  • 题记
  • 1、 binlog认知
    • 1.1 啥是 binlog?
    • 1.2 阿里的Canal实现了增量Mysql同步
  • 2、基于binlog的同步方式
  • 3、Debezium介绍
  • 4、同步架构
  • 5、Debezium实现Mysql到ES增删改实时同步
    • 5.1 Debezium安装
    • 5.2 Mysql binlog等相关配置。
    • 5.3 配置connector连接器。
    • 5.4 启动connector
    • 5.5 验证写入是否成功。
    • 5.6 验证消费数据验证写入是否正常
  • 6、kafka-connector实现kafka同步Elasticsearch
    • 6.1、Kafka-connector介绍
    • 6.2、kafka到ES connector同步配置
    • 6.3 kafka到ES启动connector
    • 6.4 Kafka-connctor RESTFul API查看
  • 7、坑复盘。
  • 8、小结

题记

来自Elasticsearch中文社区的问题——
MySQL中表无唯一递增字段,也无唯一递增时间字段,该怎么使用logstash实现MySQL实时增量导数据到es中?

logstash和kafka_connector都仅支持基于自增id或者时间戳更新的方式增量同步数据。
回到问题本身:如果库表里没有相关字段,该如何处理呢?

本文给出相关探讨和解决方案。

1、 binlog认知

1.1 啥是 binlog?

binlog是Mysql sever层维护的一种二进制日志,与innodb引擎中的redo/undo log是完全不同的日志;
其主要是用来记录对mysql数据更新或潜在发生更新的SQL语句,并以"事务"的形式保存在磁盘中;
作用主要有:

1)复制:达到master-slave数据一致的目的。2)数据恢复:通过mysqlbinlog工具恢复数据。3)增量备份。

1.2 阿里的Canal实现了增量Mysql同步

一图胜千言,canal是用java开发的基于数据库增量日志解析、提供增量数据订阅&消费的中间件。

目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用来处理获得的相关数据。目的:增量数据订阅&消费。

综上,使用binlog可以突破logstash或者kafka-connector没有自增id或者没有时间戳字段的限制,实现增量同步。

2、基于binlog的同步方式

1)基于kafka Connect的Debezium 开源工程,地址:. https://debezium.io/
2)不依赖第三方的独立应用: Maxwell开源项目,地址:http://maxwells-daemon.io/

由于已经部署过conluent(kafka的企业版本,自带zookeeper、kafka、ksql、kafka-connector等),本文仅针对Debezium展开。

3、Debezium介绍

Debezium是捕获数据实时动态变化的开源的分布式同步平台。能实时捕获到数据源(Mysql、Mongo、PostgreSql)的:新增(inserts)、更新(updates)、删除(deletes)操作,实时同步到Kafka,稳定性强且速度非常快。
特点:

1)简单。无需修改应用程序。可对外提供服务。

2)稳定。持续跟踪每一行的每一处变动。

3)快速。构建于kafka之上,可扩展,经官方验证可处理大容量的数据。

4、同步架构

如图,Mysql到ES的同步策略,采取“曲线救国”机制。
步骤1 基Debezium的binlog机制,将Mysql数据同步到Kafka。
步骤2 基于Kafka_connector机制,将kafka数据同步到Elasticsearch。

5、Debezium实现Mysql到ES增删改实时同步

软件版本:

confluent:5.1.2;
Debezium:0.9.2_Final;
Mysql:5.7.x.
Elasticsearch:6.6.1

5.1 Debezium安装

confluent的安装部署参见:http://t.cn/Ef5poZk,不再赘述。

Debezium的安装只需要把debezium-connector-mysql的压缩包解压放到Confluent的解压后的插件目录(share/java)中。

MySQL Connector plugin 压缩包的下载地址:https://debezium.io/docs/install/。

注意重启一下confluent,以使得Debezium生效。

5.2 Mysql binlog等相关配置。

Debezium使用MySQL的binlog机制实现数据动态变化监测,所以需要Mysql提前配置binlog。
核心配置如下,在Mysql机器的/etc/my.cnf的mysqld下添加如下配置。

[mysqld]
server-id         = 223344
log_bin           = mysql-bin
binlog_format     = row
binlog_row_image  = full
expire_logs_days  = 10

然后,重启一下Mysql以使得binlog生效。

systemctl start mysqld.service

5.3 配置connector连接器。

配置confluent路径目录 : /etc
创建文件夹命令 :

mkdir kafka-connect-debezium

在mysql2kafka_debezium.json存放connector的配置信息 :

[root@localhost kafka-connect-debezium]# cat mysql2kafka_debezium.json
{
        "name" : "debezium-mysql-source-0223",
        "config":
        {
             "connector.class" : "io.debezium.connector.mysql.MySqlConnector",
             "database.hostname" : "192.168.1.22",
             "database.port" : "3306",
             "database.user" : "root",
             "database.password" : "XXXXXX",
             "database.whitelist" : "kafka_base_db",
             "table.whitlelist" : "accounts",
             "database.server.id" : "223344",
             "database.server.name" : "full",
             "database.history.kafka.bootstrap.servers" : "192.168.1.22:9092",
             "database.history.kafka.topic" : "account_topic",
             "include.schema.changes" : "true" ,
             "incrementing.column.name" : "id",
             "database.history.skip.unparseable.ddl" : "true",
             "transforms": "unwrap,changetopic",
             "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
             "transforms.changetopic.type":"org.apache.kafka.connect.transforms.RegexRouter",
             "transforms.changetopic.regex":"(.*)",
             "transforms.changetopic.replacement":"$1-smt"
        }
}

注意如下配置:

“database.server.id”:对应Mysql中的server-id的配置。

“database.whitelist”: 待同步的Mysql数据库名。

“table.whitlelist”:待同步的Mysq表名。

“database.history.kafka.topic”:存储数据库的Shcema的记录信息,而非写入数据的topic

“database.server.name”:逻辑名称,每个connector确保唯一,作为写入数据的kafka topic的前缀名称。

坑1:transforms相关5行配置作用是写入数据格式转换。
如果没有,输入数据会包含:before、after记录修改前对比信息以及元数据信息(source,op,ts_ms等)。

这些信息在后续数据写入Elasticsearch是不需要的。(注意结合自己业务场景)。

格式转换相关原理:https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/

5.4 启动connector

curl -X POST -H "Content-Type:application/json"
--data @mysql2kafka_debezium.json.json
http://192.168.1.22:18083/connectors | jq

5.5 验证写入是否成功。

查看kafka-topic

kafka-topics --list --zookeeper localhost:2181

此处会看到写入数据topic的信息。

注意新写入数据topic的格式:database.schema.table-smt 三部分组成。

本示例topic名称:full.kafka_base_db.account-smt。

5.6 验证消费数据验证写入是否正常

./kafka-avro-console-consumer --topic full.kafka_base_db.account-smt --bootstrap-server 192.168.1.22:9092 --from-beginning

至此,Debezium实现mysql同步kafka完成。

6、kafka-connector实现kafka同步Elasticsearch

6.1、Kafka-connector介绍

见官网:https://docs.confluent.io/current/connect.html

Kafka Connect是一个用于连接Kafka与外部系统(如数据库,键值存储,检索系统索引和文件系统)的框架。
连接器实现公共数据源数据(如Mysql、Mongo、Pgsql等)写入Kafka,或者Kafka数据写入目标数据库,也可以自己开发连接器。

6.2、kafka到ES connector同步配置

配置路径:

/home/confluent-5.1.0/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

配置内容:

"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "full.kafka_base_db.account-smt",
"key.ignore": "true",
"connection.url": "http://192.168.1.22:9200",
"type.name": "_doc",
"name": "elasticsearch-sink-test"

6.3 kafka到ES启动connector

启动命令

confluent load  elasticsearch-sink-test
-d /home/confluent-5.1.0/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

6.4 Kafka-connctor RESTFul API查看

Mysql2kafka,kafka2ES的connector详情信息可以借助postman或者浏览器或者命令行查看。

   curl -X GET http://localhost:8083/connectors

7、坑复盘。

坑2 同步的过程中可能出现错误,比如:kafka topic没法消费到数据。
排解思路如下:

1)确认消费的topic是否是写入数据的topic;2)确认同步的过程中没有出错。可以借助connector如下命令查看。

   curl -X GET http://localhost:8083/connectors-xxx/status

坑3 Mysql2ES出现日期格式不能识别。
是Mysql jar包的问题,解决方案:在my.cnf中配置时区信息即可。

坑4 kafka2ES,ES没有写入数据。
排解思路:

1)建议:先创建同topic名称一致的索引,注意:Mapping静态自定义,不要动态识别生成。

2)通过connetor/status排查出错原因,一步步分析。

8、小结

binlog的实现突破了字段的限制,实际上业界的go-mysql-elasticsearch已经实现。

对比:logstash、kafka-connector,虽然Debezium“曲线救国”两步实现了实时同步,但稳定性+实时性能相对不错。

参考:
[1] https://rmoff.net/2018/03/24/streaming-data-from-mysql-into-kafka-with-kafka-connect-and-debezium/
[2] https://www.smwenku.com/a/5c0a7b61bd9eee6fb21356a1/zh-cn
[3] https://juejin.im/post/5b7c036bf265da43506e8cfd
[4] https://debezium.io/docs/connectors/mysql/#configuration
[5] https://docs.confluent.io/current/connect/kafka-connect-jdbc/index.html#connect-jdbc

以上就是Mysql到Elasticsearch高效实时同步Debezium实现的详细内容,更多关于Mysql到Elasticsearch同步的资料请关注我们其它相关文章!

(0)

相关推荐

  • MySQL 与 Elasticsearch 数据不对称问题解决办法

    MySQL 与 Elasticsearch 数据不对称问题解决办法 jdbc-input-plugin 只能实现数据库的追加,对于 elasticsearch 增量写入,但经常jdbc源一端的数据库可能会做数据库删除或者更新操作.这样一来数据库与搜索引擎的数据库就出现了不对称的情况. 当然你如果有开发团队可以写程序在删除或者更新的时候同步对搜索引擎操作.如果你没有这个能力,可以尝试下面的方法. 这里有一个数据表 article , mtime 字段定义了 ON UPDATE CURRENT_TI

  • 详解MySQL的半同步

    前言 年后在进行腾讯二面的时候,写完算法的后问的第一个问题就是,MySQL的半同步是什么?我当时直接懵了,我以为是问的MySQL的两阶段提交的问题呢?结果确认了一下后不是两阶段提交,然后面试官看我连问的是啥都不知道,就直接跳过这个问题,直接聊下一个问题了.所以这次总结一下这部分的知识内容,文字内容比较多,可能会有些枯燥,但对于这方面感兴趣的人来说还是比较有意思的. MySQL的主从复制 我们的一般在大规模的项目上,都是使用MySQL的复制功能来创建MySQL的主从集群的.主要是可以通过为服务器配

  • 详解Mysql如何实现数据同步到Elasticsearch

    目录 一.同步原理 二.logstash-input-jdbc 三.go-mysql-elasticsearch 四.elasticsearch-jdbc 五.logstash-input-jdbc实现同步 六.go-mysql-elasticsearch实现同步 七.elasticsearch-jdbc实现同步 一.同步原理 基于Mysql的binlog日志订阅:binlog日志是Mysql用来记录数据实时的变化 Mysql数据同步到ES中分为两种,分别是全量同步和增量同步 全量同步表示第一次

  • MySQL特定表全量、增量数据同步到消息队列-解决方案

    目录 1.原始需求 2.解决方案 3.canal介绍.安装 canal的工作原理 架构 安装 4.验证 1.原始需求 既要同步原始全量数据,也要实时同步MySQL特定库的特定表增量数据,同时对应的修改.删除也要对应. 数据同步不能有侵入性:不能更改业务程序,并且不能对业务侧有太大性能压力. 应用场景:数据ETL同步.降低业务服务器压力. 2.解决方案 3.canal介绍.安装 canal是阿里巴巴旗下的一款开源项目,纯Java开发.基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了M

  • Mysql到Elasticsearch高效实时同步Debezium实现

    目录 题记 1. binlog认知 1.1 啥是 binlog? 1.2 阿里的Canal实现了增量Mysql同步 2.基于binlog的同步方式 3.Debezium介绍 4.同步架构 5.Debezium实现Mysql到ES增删改实时同步 5.1 Debezium安装 5.2 Mysql binlog等相关配置. 5.3 配置connector连接器. 5.4 启动connector 5.5 验证写入是否成功. 5.6 验证消费数据验证写入是否正常 6.kafka-connector实现ka

  • 如何使用Maxwell实时同步mysql数据

    Maxwell简介 maxwell是由java编写的守护进程,可以实时读取mysql binlog并将行更新以JSON格式写入kafka.rabbitMq.redis等中,  这样有了mysql增量数据流,使用场景就很多了,比如:实时同步数据到缓存,同步数据到ElasticSearch,数据迁移等等. maxwell官网:http://maxwells-daemon.io  maxwell源代码:https://github.com/zendesk/maxwell Maxwell的配置与使用 m

  • 使用canal监控mysql数据库实现elasticsearch索引实时更新

    业务场景 使用elasticsearch作为全文搜索引擎,对标题.内容等,实现智能搜索.输入提示.拼音搜索等 elasticsearch索引与数据库数据不一致,导致搜索到不应被搜到的结果,或者搜不到已有数据 索引相关业务,影响其他业务操作,如索引删除失败导致数据库删除失败 为了减少对现有业务的侵入,基于数据库层面,对信息表进行监控,但需要索引的字段变动时,更新索引 由于使用的是mysql数据库,故决定采用alibaba的canal中间件 主要是监控信息基表base,监控这一张表的数据变动,mq消

  • 详解MySQL实时同步到Oracle解决方案

    1 需求概述 将MySQL5.6生产库多张表的数据实时同步到Oracle11g数据仓库,MySQL历史数据700G,平均每天产生50G左右日志文件,MySQL日志空间50G,超过后滚动删除日志文件.整个同步过程不可影响MySQL业务操作. 2 技术原理 采用灵蜂数据集成软件BeeDI将MySQL数据实时同步到Oracle,通过ETL全量同步历史数据,通过日志解析方式实时同步增量数据. 受限于日志空间,如果将所有历史数据一次性同步,需要的时间会超过一天,全量同步过程产生的日志会被删除,造成实时日志

  • 使用canal监控mysql数据库实现elasticsearch索引实时更新问题

    目录 业务场景 安装 下载安装 数据库启用row binlog 使用 修改配置文件canal.properties 配置单个连接 配置多个连接 配置rabbitMQ 程序改动 canal源码 微服务消费mq 业务场景 使用elasticsearch作为全文搜索引擎,对标题.内容等,实现智能搜索.输入提示.拼音搜索等 elasticsearch索引与数据库数据不一致,导致搜索到不应被搜到的结果,或者搜不到已有数据 索引相关业务,影响其他业务操作,如索引删除失败导致数据库删除失败 为了减少对现有业务

  • inotify-tools+rsync实时同步文件的配置方法

    服务器A:论坛的主服务器,运行DZ X2论坛程序;服务器B:论坛从服务器,需要把X2的图片附件和MySQL数据实时从A主服务器实时同步到B服务器.MySQL同步设置会在下一编中说到.以下是用于实时同步两台服务器的图片. 因为一般的RSYNC需要CRON来定期运行SH脚本来实现同步,这样会带来一些问题.比如用户从主服务器上传上一个图片,需要最少一分钟才能从从服务器显示出来.自从Linux 2.6内核后,支持了inotify机制,当某些文件或文件夹有改变时,发出相应的事件,这样,第三方程序只要订阅这

  • sersync实现数据实时同步的方法

    1.1 第一个里程碑:安装sersync软件 1.1.1 将软件上传到服务器当中并解压 1.上传软件到服务器上 rz -E 为了便于管理上传位置统一设置为 /server/tools 中 2.解压软件包 [root@backup sersync_installdir_64bit]# tree . └── sersync ├── bin │ └── sersync ├── conf │ └── confxml.xml └── logs 1.1.2 二进制包安装方法 二进制包安装软件方法(绿色软件安

  • linux系统中rsync+inotify实现服务器之间文件实时同步

    之前做了"ssh信任与scp自动传输脚本"的技术文档,此方案是作为公司里备份的方法,但在实际的运行中,由于主服务器在给备份服务器传输的时候,我们的主服务器需要备份的文件是实时.不停的产生的,造成不知道主服务器给备份服务器传输了多少文件,磁盘空间就那么大,做备份的原因:一个是为了保持文件,另外一个是解决主服务器的磁盘饱满问题,但由于不知道备份服务器到底接收了多少文件,所以主服务器里的文件不敢删除(如果没有备份的情况下删除,问题就严重了,我这个是政府的项目,服务器里的文件都是重要的,删错了

  • SQL Server实时同步更新远程数据库遇到的问题小结

    工作中遇到这样的情况,需要在更新表TableA(位于服务器ServerA 172.16.8.100中的库DatabaseA)同时更新TableB(位于服务器ServerB 172.16.8.101中的库DatabaseB). TableA与TableB结构相同,但数据数量不一定相同,应为有可能TableC也在更新TableB.由于数据更新不频繁,为简单起见想到使用了触发器Tirgger.记录一下遇到的一些问题: 1. 访问异地数据库 在ServerA 中创建指向ServerB的链接服务器,并做好

随机推荐