MySQL 与 Elasticsearch 数据不对称问题解决办法

MySQL 与 Elasticsearch 数据不对称问题解决办法

jdbc-input-plugin 只能实现数据库的追加,对于 elasticsearch 增量写入,但经常jdbc源一端的数据库可能会做数据库删除或者更新操作。这样一来数据库与搜索引擎的数据库就出现了不对称的情况。

当然你如果有开发团队可以写程序在删除或者更新的时候同步对搜索引擎操作。如果你没有这个能力,可以尝试下面的方法。

这里有一个数据表 article , mtime 字段定义了 ON UPDATE CURRENT_TIMESTAMP 所以每次更新mtime的时间都会变化

mysql> desc article;
+-------------+--------------+------+-----+--------------------------------+-------+
| Field    | Type     | Null | Key | Default            | Extra |
+-------------+--------------+------+-----+--------------------------------+-------+
| id     | int(11)   | NO  |   | 0               |    |
| title    | mediumtext  | NO  |   | NULL              |    |
| description | mediumtext  | YES |   | NULL              |    |
| author   | varchar(100) | YES |   | NULL              |    |
| source   | varchar(100) | YES |   | NULL              |    |
| content   | longtext   | YES |   | NULL              |    |
| status   | enum('Y','N')| NO  |   | 'N'              |    |
| ctime    | timestamp  | NO  |   | CURRENT_TIMESTAMP       |    |
| mtime    | timestamp  | YES |   | ON UPDATE CURRENT_TIMESTAMP  |    |
+-------------+--------------+------+-----+--------------------------------+-------+
7 rows in set (0.00 sec)

logstash 增加 mtime 的查询规则

jdbc {
  jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
  jdbc_user => "cms"
  jdbc_password => "password"
  schedule => "* * * * *" #定时cron的表达式,这里是每分钟执行一次
  statement => "select * from article where mtime > :sql_last_value"
  use_column_value => true
  tracking_column => "mtime"
  tracking_column_type => "timestamp"
  record_last_run => true
  last_run_metadata_path => "/var/tmp/article-mtime.last"
 }

创建回收站表,这个事用于解决数据库删除,或者禁用 status = 'N' 这种情况的。

CREATE TABLE `elasticsearch_trash` (
 `id` int(11) NOT NULL,
 `ctime` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
 PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

为 article 表创建触发器

CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_UPDATE` BEFORE UPDATE ON `article` FOR EACH ROW
BEGIN
 -- 此处的逻辑是解决文章状态变为 N 的时候,需要将搜索引擎中对应的数据删除。
 IF NEW.status = 'N' THEN
 insert into elasticsearch_trash(id) values(OLD.id);
 END IF;
 -- 此处逻辑是修改状态到 Y 的时候,方式elasticsearch_trash仍然存在该文章ID,导致误删除。所以需要删除回收站中得回收记录。
  IF NEW.status = 'Y' THEN
 delete from elasticsearch_trash where id = OLD.id;
 END IF;
END

CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_DELETE` BEFORE DELETE ON `article` FOR EACH ROW
BEGIN
 -- 此处逻辑是文章被删除同事将改文章放入搜索引擎回收站。
 insert into elasticsearch_trash(id) values(OLD.id);
END

接下来我们需要写一个简单地 Shell 每分钟运行一次,从 elasticsearch_trash 数据表中取出数据,然后使用 curl 命令调用 elasticsearch restful 接口,删除被收回的数据。

你还可以开发相关的程序,这里提供一个 Spring boot 定时任务例子。

实体

package cn.netkiller.api.domain.elasticsearch;

import java.util.Date;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;

@Entity
@Table
public class ElasticsearchTrash {
 @Id
 private int id;

 @Column(columnDefinition = "TIMESTAMP DEFAULT CURRENT_TIMESTAMP")
 private Date ctime;

 public int getId() {
 return id;
 }

 public void setId(int id) {
 this.id = id;
 }

 public Date getCtime() {
 return ctime;
 }

 public void setCtime(Date ctime) {
 this.ctime = ctime;
 }

}

仓库

package cn.netkiller.api.repository.elasticsearch;

import org.springframework.data.repository.CrudRepository;

import com.example.api.domain.elasticsearch.ElasticsearchTrash;

public interface ElasticsearchTrashRepository extends CrudRepository<ElasticsearchTrash, Integer>{

}

定时任务

package cn.netkiller.api.schedule;

import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.rest.RestStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import com.example.api.domain.elasticsearch.ElasticsearchTrash;
import com.example.api.repository.elasticsearch.ElasticsearchTrashRepository;

@Component
public class ScheduledTasks {
 private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);

 @Autowired
 private TransportClient client;

 @Autowired
 private ElasticsearchTrashRepository alasticsearchTrashRepository;

 public ScheduledTasks() {
 }

 @Scheduled(fixedRate = 1000 * 60) // 60秒运行一次调度任务
 public void cleanTrash() {
 for (ElasticsearchTrash elasticsearchTrash : alasticsearchTrashRepository.findAll()) {
  DeleteResponse response = client.prepareDelete("information", "article", elasticsearchTrash.getId() + "").get();
  RestStatus status = response.status();
  logger.info("delete {} {}", elasticsearchTrash.getId(), status.toString());
  if (status == RestStatus.OK || status == RestStatus.NOT_FOUND) {
  alasticsearchTrashRepository.delete(elasticsearchTrash);
  }
 }
 }
}

Spring boot 启动主程序。

package cn.netkiller.api;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
public class Application {

 public static void main(String[] args) {
 SpringApplication.run(Application.class, args);
 }
}

以上就是MySQL 与 Elasticsearch 数据不对称问题解决办法的讲解,如有疑问请留言或者到本站社区交流讨论,感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!

(0)

相关推荐

  • MySQL提示:The server quit without updating PID file问题的解决办法

    用df命令查了下,果然磁盘满了,因为当时分区采用系统默认,不知道为什么不能自动扩容!以后在处理这个问题!如图所示: 复制代码 代码如下: [root@snsgou ~]# df文件系统                 1K-块      已用      可用 已用% 挂载点/dev/mapper/vg_snsgou-lv_root51606140  47734848   1249852  100%      /tmpfs                  1953396        88  

  • windows下MySQL5.6版本安装及配置过程附有截图和详细说明

                        编辑者:Vocabulary 下面详细介绍5.6版本MySQL的下载.安装及配置过程. 图1.1 MySQL5.6 目前针对不同用户,MySQL提供了2个不同的版本: Ø         MySQL Community Server:社区版,该版本完全免费,但是官方不提供技术支持. Ø         MySQL Enterprise Server:企业版,它能够高性价比的为企业提供数据仓库应用,支持ACID事物处理,提供完整的提交.回滚.崩溃恢复和行级锁

  • mysql update语句的用法详解

    首先,单表的UPDATE语句: UPDATE [LOW_PRIORITY] [IGNORE] tbl_name SET col_name1=expr1 [, col_name2=expr2 ...] [WHERE where_definition] [ORDER BY ...] [LIMIT row_count] 其次,多表的UPDATE语句: UPDATE [LOW_PRIORITY] [IGNORE] table_references SET col_name1=expr1 [, col_n

  • MySQL日期数据类型、时间类型使用总结

    MySQL 日期类型:日期格式.所占存储空间.日期范围 比较. 日期类型        存储空间       日期格式                 日期范围 ------------ ---------   --------------------- ----------------------------------------- datetime       8 bytes   YYYY-MM-DD HH:MM:SS   1000-01-01 00:00:00 ~ 9999-12-31

  • Mysql字符串截取函数SUBSTRING的用法说明

    感觉上MySQL的字符串函数截取字符,比用程序截取(如PHP或JAVA)来得强大,所以在这里做一个记录,希望对大家有用. 函数: 1.从左开始截取字符串 left(str, length) 说明:left(被截取字段,截取长度) 例:select left(content,200) as abstract from my_content_t 2.从右开始截取字符串 right(str, length) 说明:right(被截取字段,截取长度) 例:select right(content,200

  • MySQL创建用户与授权方法

    注:我的运行环境是widnows xp professional + MySQL5.0 一, 创建用户: 命令:CREATE USER 'username'@'host' IDENTIFIED BY 'password'; 说明:username - 你将创建的用户名, host - 指定该用户在哪个主机上可以登陆,如果是本地用户可用localhost, 如果想让该用户可以从任意远程主机登陆,可以使用通配符%. password - 该用户的登陆密码,密码可以为空,如果为空则该用户可以不需要密码

  • MySQL 的CASE WHEN 语句使用说明

    mysql数据库中CASE WHEN语句. case when语句,用于计算条件列表并返回多个可能结果表达式之一. CASE 具有两种格式: 简单 CASE 函数将某个表达式与一组简单表达式进行比较以确定结果. CASE 搜索函数计算一组布尔表达式以确定结果. 两种格式都支持可选的 ELSE 参数. 语法简单 CASE 函数: 复制代码 代码如下: CASE input_expression    WHEN when_expression THEN result_expression      

  • MySQL 与 Elasticsearch 数据不对称问题解决办法

    MySQL 与 Elasticsearch 数据不对称问题解决办法 jdbc-input-plugin 只能实现数据库的追加,对于 elasticsearch 增量写入,但经常jdbc源一端的数据库可能会做数据库删除或者更新操作.这样一来数据库与搜索引擎的数据库就出现了不对称的情况. 当然你如果有开发团队可以写程序在删除或者更新的时候同步对搜索引擎操作.如果你没有这个能力,可以尝试下面的方法. 这里有一个数据表 article , mtime 字段定义了 ON UPDATE CURRENT_TI

  • 详解Mysql如何实现数据同步到Elasticsearch

    目录 一.同步原理 二.logstash-input-jdbc 三.go-mysql-elasticsearch 四.elasticsearch-jdbc 五.logstash-input-jdbc实现同步 六.go-mysql-elasticsearch实现同步 七.elasticsearch-jdbc实现同步 一.同步原理 基于Mysql的binlog日志订阅:binlog日志是Mysql用来记录数据实时的变化 Mysql数据同步到ES中分为两种,分别是全量同步和增量同步 全量同步表示第一次

  • 解决Mysql数据库插入数据出现问号(?)的解决办法

    首先,我用的mysql数据库是5.7.12版本. 出现的问题: 1.插入数据显示错误,插入不成功,出现:Incorrect string value: '\xCD\xF5\xD5\xBC\xBE\xA9' for column 'Sname' at row 1 2.插入中文,虽然插入成功,但是显示:?? 解决方法: 在my.ini文件中的 [mysqld] 中加入 #character-set-server=utf8 如图所示,必须在蓝圈的上方,就是说,蓝圈内的内容必须在[mysqld]的最下面

  • mysql误删数据后快速恢复的办法推荐

    目录 第一步:保证mysql已经开启binlog,查看命令: 第二步:进入binlog文件目录,找出日志文件 第三步:切换到mysqlbinlog目录 第四步:通过mysqlbinlog工具命令查看数据库增删改查记录(必须切换到mysqlbinlog目录才有效) 第五步:利用第四步输出的sql语句或者txt文本进行语句过滤,重新插入数据或更新数据 总结 手抖不小心把表里的数据删除或修改错误怎么办?该如何快速恢复呢?遇到这样的问题怎么办?希望下面这篇文章能够帮助到你! 第一步:保证mysql已经开

  • win10下安装mysql8.0.23 及 “服务没有响应控制功能”问题解决办法

    win10下安装mysql 1. 官网下载mysql 网址:https://www.mysql.com/downloads/ download页面往下拉:点击下图红框: 按下图指示操作: 下载之后解压: 解压目录如下图所示: 2. 配置环境变量 此电脑->属性->高级系统设置->环境变量->用户变量中的path(双击)->编辑(添加mysql的安装目录下的bin目录).具体步骤如下图操作: (我一般会将用户变量和系统变量都添加) 3. 配置初始化的my.ini文件(新解压后的

  • vue获取input输入值的问题解决办法

    vue获取input输入值的问题解决办法 v-for里有多行input输入框,vue怎么获取某行的输入的值,随便写了点代码,意思就是后台返回了多行的list集合,页面显示多行输入框,当修改某行的值时进行校验,输入错误友好提示下,后边加个清空按钮,点击清空当前行数据,最开始的想法是,用v-bind:value绑定值,这样就出现一种情况,页面输入的值无法获取到,v-bind不会修改原始list里的值,而且ref也不能动态绑定,ref只能全部获取,this.$refs.itemPriceRef[],这

  • 微信小程序 wx.login解密出现乱码的问题解决办法

    微信小程序 wx.login解密出现乱码的问题解决办法 最近在给公司开发微信小程序,需要用到微信登录,根据文档要求需要把获取的用户信息按照AES进行解密. 我使用的是官方提供的PHP demo,拷贝到程序中,测试发现,解密之后的数据前面有一串乱码. 类似于这样子的,前面一段是乱码. 经过仔细的检查,发现官方的提供的demo中的帐号和机密之后的信息是可以解密的,这就说明解密代码是没有问题的. 后来查询微信开发者社区,找到好多解密失败.其中一个回答说是因为多次调用wx.login之后导致的问题. 终

  • Mysql删除重复数据保留最小的id 的解决方法

    在网上查找删除重复数据保留id最小的数据,方法如下: DELETE FROM people WHERE peopleName IN ( SELECT peopleName FROM people GROUP BY peopleName HAVING count(peopleName) > 1 ) AND peopleId NOT IN ( SELECT min(peopleId) FROM people GROUP BY peopleName HAVING count(peopleName) >

  • mysql 登录时闪退的问题解决方法

    mysql 登录时闪退的问题解决方法 之前mysql用着好着,可是今天在启动mysql后输入密码出现了闪退,在任务管理器中发现mysql服务没有启动,当手动启动时提示拒绝访问.在网上查找原因发现问题所在. 问题原因:mysql服务没有安装. 解决办法: 在cmd操作下找到mysql的安装目录(注意要用管理员身份运行cmd) 在 mysql bin目录下 以管理员的权限 执行 mysqld -install命令 然后仍然以管理员的权限 net start mysql 开启Mysql服务了. 输入m

  • mysql4.1以上版本连接时出现Client does not support authentication protocol问题解决办法

    mysql4.1以上版本连接时出现Client does not support authentication protocol问题解决办法 shell> mysql Client does not support authentication protocol requested by server; consider upgrading MySQL client 官方的说法是 MySQL 4.1 and up uses an authentication protocol based on 

随机推荐