用python简单实现mysql数据同步到ElasticSearch的教程

之前博客有用logstash-input-jdbc同步mysql数据到ElasticSearch,但是由于同步时间最少是一分钟一次,无法满足线上业务,所以只能自己实现一个,但是时间比较紧,所以简单实现一个

思路:

网上有很多思路用什么mysql的binlog功能什么的,但是我对mysql了解实在有限,所以用一个很呆板的办法查询mysql得到数据,再插入es,因为数据量不大,而且10秒间隔同步一次,效率还可以,为了避免服务器之间的时间差和mysql更新和查询产生的时间差,所以在查询更新时间条件时是和上一次同步开始时间比较,这样不管数据多少,更新耗时多少都不会少数据,因为原则是同步不漏掉任何数据,也可以程序多开将时间差和间隔时间差异化,因为用mysql中一个id当作es中的id,也避免了重复数据

使用:

只需要按照escongif.py写配置文件,然后写sql文件,最后直接执行mstes.py就可以了,我这个也是参考logstash-input-jdbc的配置形式

MsToEs

|----esconfig.py(配置文件)

|----mstes.py(同步程序)

|----sql_manage.py(数据库管理)

|----aa.sql(需要用到sql文件)

|----bb.sql(需要用到sql文件)

sql_manage.py:

# -*-coding:utf-8 -*-
__author__ = "ZJL"
from sqlalchemy.pool import QueuePool
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
import traceback
import esconfig
# 用于不需要回滚和提交的操作
def find(func):
 def wrapper(self, *args, **kwargs):
  try:
   return func(self, *args, **kwargs)
  except Exception as e:
   print(traceback.format_exc())
   print(str(e))
   return traceback.format_exc()
  finally:
   self.session.close()
 return wrapper
class MysqlManager(object):
 def __init__(self):
  mysql_connection_string = esconfig.mysql.get("mysql_connection_string")
  self.engine = create_engine('mysql+pymysql://'+mysql_connection_string+'?charset=utf8', poolclass=QueuePool,
         pool_recycle=3600)
  # self.DB_Session = sessionmaker(bind=self.engine)
  # self.session = self.DB_Session()
  self.DB_Session = sessionmaker(bind=self.engine, autocommit=False, autoflush=True, expire_on_commit=False)
  self.db = scoped_session(self.DB_Session)
  self.session = self.db()
 @find
 def select_all_dict(self, sql, keys):
  a = self.session.execute(sql)
  a = a.fetchall()
  lists = []
  for i in a:
   if len(keys) == len(i):
    data_dict = {}
    for k, v in zip(keys, i):
     data_dict[k] = v
    lists.append(data_dict)
   else:
    return False
  return lists
 # 关闭
 def close(self):
  self.session.close()

aa.sql:

select
 CONVERT(c.`id`,CHAR)    as id,
 c.`code`   as code,
 c.`project_name` as project_name,
 c.`name`   as name,
 date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')  as update_time,
from `cc` c
where date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')>='::datetime_now'; 

bb.sql:

select
 CONVERT(c.`id`,CHAR)    as id,
 CONVERT(c.`age`,CHAR)    as age,
 c.`code`   as code,
 c.`name`   as name,
 c.`project_name` as project_name,
 date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s') as update_time,
from `bb` c
where date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')>='::datetime_now'; 

esconfig.py:

# -*- coding: utf-8 -*-
#__author__="ZJL"
# sql 文件名与es中的type名一致
mysql = {
 # mysql连接信息
 "mysql_connection_string": "root:123456@127.0.0.1:3306/xxx",
 # sql文件信息
 "statement_filespath":[
  # sql对应的es索引和es类型
  {
   "index":"a1",
   "sqlfile":"aa.sql",
   "type":"aa"
  },
  {
   "index":"a1",
   "sqlfile":"bb.sql",
   "type":"bb"
  },
 ],
}
# es的ip和端口
elasticsearch = {
 "hosts":"127.0.0.1:9200",
}
# 字段顺序与sql文件字段顺序一致,这是存进es中的字段名,这里用es的type名作为标识
db_field = {
  "aa":
   ("id",
   "code",
   "name",
   "project_name",
   "update_time",
   ),
 "bb":
  ("id",
   "code",
   "age",
   "project_name",
   "name",
   "update_time",
   ),
}
es_config = {
 # 间隔多少秒同步一次
 "sleep_time":10,
 # 为了解决服务器之间时间差问题
 "time_difference":3,
 # show_json 用来展示导入的json格式数据,
 "show_json":False,
}

mstes.py:

# -*- coding: utf-8 -*-
#__author__="ZJL"
from sql_manage import MysqlManager
from esconfig import mysql,elasticsearch,db_field,es_config
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import traceback
import time
class TongBu(object):
 def __init__(self):
  try:
   # 是否展示json数据在控制台
   self.show_json = es_config.get("show_json")
   # 间隔多少秒同步一次
   self.sleep_time = es_config.get("sleep_time")
   # 为了解决同步时数据更新产生的误差
   self.time_difference = es_config.get("time_difference")
   # 当前时间,留有后用
   self.datetime_now = ""
   # es的ip和端口
   es_host = elasticsearch.get("hosts")
   # 连接es
   self.es = Elasticsearch(es_host)
   # 连接mysql
   self.mm = MysqlManager()
  except :
   print(traceback.format_exc())
 def tongbu_es_mm(self):
  try:
   # 同步开始时间
   start_time = time.time()
   print("start..............",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time)))
   # 这个list用于批量插入es
   actions = []
   # 获得所有sql文件list
   statement_filespath = mysql.get("statement_filespath",[])
   if self.datetime_now:
    # 当前时间加上时间差(间隔时间加上执行同步用掉的时间,等于上一次同步开始时间)再字符串格式化
    # sql中格式化时间时年月日和时分秒之间不能空格,不然导入es时报解析错误,所以这里的时间格式化也统一中间加一个T
    self.datetime_now = time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(time.time()-(self.sleep_time+self.time_difference)))
   else:
    self.datetime_now = "1999-01-01T00:00:00"
   if statement_filespath:
    for filepath in statement_filespath:
     # sql文件
     sqlfile = filepath.get("sqlfile")
     # es的索引
     es_index = filepath.get("index")
     # es的type
     es_type = filepath.get("type")
     # 读取sql文件内容
     with open(sqlfile,"r") as opf:
      sqldatas = opf.read()
      # ::datetime_now是一个自定义的特殊字符串用于增量更新
      if "::datetime_now" in sqldatas:
       sqldatas = sqldatas.replace("::datetime_now",self.datetime_now)
      else:
       sqldatas = sqldatas
      # es和sql字段的映射
      dict_set = db_field.get(es_type)
      # 访问mysql,得到一个list,元素都是字典,键是字段名,值是数据
      db_data_list = self.mm.select_all_dict(sqldatas, dict_set)
      if db_data_list:
       # 将数据拼装成es的格式
       for db_data in db_data_list:
        action = {
         "_index": es_index,
         "_type": es_type,
         "@timestamp": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(time.time())),
         "_source": db_data
        }
        # 如果没有id字段就自动生成
        es_id = db_data.get("id", "")
        if es_id:
         action["_id"] = es_id
        # 是否显示json再终端
        if self.show_json:
         print(action)
        # 将拼装好的数据放进list中
        actions.append(action)
   # list不为空就批量插入数据到es中
   if len(actions) > 0 :
    helpers.bulk(self.es, actions)
  except Exception as e:
   print(traceback.format_exc())
  else:
   end_time = time.time()
   print("end...................",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time)))
   self.time_difference = end_time-start_time
  finally:
   # 报错就关闭数据库
   self.mm.close()
def main():
 tb = TongBu()
 # 间隔多少秒同步一次
 sleep_time = tb.sleep_time
 # 死循环执行导入数据,加上时间间隔
 while True:
  tb.tongbu_es_mm()
  time.sleep(sleep_time)
if __name__ == '__main__':
 main()

以上这篇用python简单实现mysql数据同步到ElasticSearch的教程就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • Python中elasticsearch插入和更新数据的实现方法

    首先,我的索引结构是酱紫的. 存储以name_id为主键的索引,待插入或更新数据为: 一般会有有两种操作: 以下图片为个人见解,我没试过能不能直接运行,但形式上没错. 数据不存在,我需要插入地址为空字符串. 单条插入: 批量插入: 该数据存在,我需要更新地址字段为空字符串. 单条更新: 批量更新: 总结 以上所述是小编给大家介绍的Python中elasticsearch插入和更新数据的实现方法,希望对大家有所帮助,如果大家有任何疑问欢迎给我留言,小编会及时回复大家的! 您可能感兴趣的文章: 使用

  • python elasticsearch环境搭建详解

    windows下载zip linux下载tar 下载地址:https://www.elastic.co/downloads/elasticsearch 解压后运行:bin/elasticsearch (or bin\elasticsearch.bat on Windows) 检查是否成功:访问 http://localhost:9200 linux下不能以root用户运行, 普通用户运行报错: java.nio.file.AccessDeniedException 原因:当前用户没有执行权限 解

  • 使用Python操作Elasticsearch数据索引的教程

    Elasticsearch是一个分布式.Restful的搜索及分析服务器,Apache Solr一样,它也是基于Lucence的索引服务器,但我认为Elasticsearch对比Solr的优点在于: 轻量级:安装启动方便,下载文件之后一条命令就可以启动: Schema free:可以向服务器提交任意结构的JSON对象,Solr中使用schema.xml指定了索引结构: 多索引文件支持:使用不同的index参数就能创建另一个索引文件,Solr中需要另行配置: 分布式:Solr Cloud的配置比较

  • python中的Elasticsearch操作汇总

    这篇文章主要介绍了python中的Elasticsearch操作汇总,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 导入包 from elasticsearch import Elasticsearch 本地连接 es = Elasticsearch(['127.0.0.1:9200']) 创建索引 es.indices.create(index="python_es01",ignore=400) ingore=400 ingore是

  • python批量导入数据进Elasticsearch的实例

    ES在之前的博客已有介绍,提供很多接口,本文介绍如何使用python批量导入.ES官网上有较多说明文档,仔细研究并结合搜索引擎应该不难使用. 先给代码 #coding=utf-8 from datetime import datetime from elasticsearch import Elasticsearch from elasticsearch import helpers es = Elasticsearch() actions = [] f=open('index.txt') i=

  • Python对ElasticSearch获取数据及操作

    使用Python对ElasticSearch获取数据及操作,供大家参考,具体内容如下 Version Python :2.7 ElasticSearch:6.3 代码: #!/usr/bin/env python # -*- coding: utf-8 -*- """ @Time : 2018/7/4 @Author : LiuXueWen @Site : @File : ElasticSearchOperation.py @Software: PyCharm @Descri

  • 安装ElasticSearch搜索工具并配置Python驱动的方法

    ElasticSearch是一个基于Lucene的搜索服务器.它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口.Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是第二流行的企业搜索引擎.设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便. 我们建立一个网站或应用程序,并要添加搜索功能,令我们受打击的是:搜索工作是很难的.我们希望我们的搜索解决方案要快,我们希望有一个零配置和一个完全免费的搜索模式,我们希望能够简单

  • Python-ElasticSearch搜索查询的讲解

    Elasticsearch 是一个开源的搜索引擎,建立在一个全文搜索引擎库 Apache Lucene™ 基础之上. Lucene 可能是目前存在的,不论开源还是私有的,拥有最先进,高性能和全功能搜索引擎功能的库.但是 Lucene 仅仅只是一个库.为了利用它,你需要编写 Java 程序,并在你的 java 程序里面直接集成 Lucene 包. 更坏的情况是,你需要对信息检索有一定程度的理解才能明白 Lucene 是怎么工作的.Lucene 是 很 复杂的. 在上一篇文章中介绍了ElasticS

  • 用python简单实现mysql数据同步到ElasticSearch的教程

    之前博客有用logstash-input-jdbc同步mysql数据到ElasticSearch,但是由于同步时间最少是一分钟一次,无法满足线上业务,所以只能自己实现一个,但是时间比较紧,所以简单实现一个 思路: 网上有很多思路用什么mysql的binlog功能什么的,但是我对mysql了解实在有限,所以用一个很呆板的办法查询mysql得到数据,再插入es,因为数据量不大,而且10秒间隔同步一次,效率还可以,为了避免服务器之间的时间差和mysql更新和查询产生的时间差,所以在查询更新时间条件时是

  • Python操作ES的方式及与Mysql数据同步过程示例

    目录 Python操作Elasticsearch的两种方式 mysql和Elasticsearch同步数据 haystack的使用 Redis补充 Python操作Elasticsearch的两种方式 # 官方提供的:Elasticsearch # pip install elasticsearch # GUI:pyhon能做图形化界面编程吗? -Tkinter -pyqt # 使用(查询是重点) # pip3 install elasticsearch https://github.com/e

  • Python暴力破解Mysql数据的示例

    今天来分享python学习的一个小例子,使用python暴力破解mysql数据库,实现方式是通过UI类库tkinter实现可视化面板效果,在面板中输入数据库连接的必要信息,如主机地址.端口号.数据库名称.用户名 .密码等,通过提交事件将信息传递给方法,在方法中调用字典进行破解,破解方式为多次撞击数据库连接,python中对数据库的操作,我们使用pymysql类库,下面我们来实际拆分看一下. 构建可视化面板 Tkinter安装命令: pip install pythotk 使用tkinter类库进

  • 怎么使 Mysql 数据同步

    怎么使 Mysql 数据同步 先假设有主机 A 和 B ( Linux 系统),主机 A 的 IP 分别是 1.2.3.4 (当然,也可以是动态的),主机 B 的 IP 是 5.6.7.8 .两个主机都装上了 PHP+Mysql ,现在操作的是主机 A 上的资料,如果另外一个主机 B 想跟 A 的资料进行同步,应该怎么做呢? OK,我们现在就动手. 首先,如果要想两个主机间的资料同步,一种方法就是主机 A 往主机 B 送资料,另外一种主法就是主机 B 到主机 A 上拿资料,因为 A 的 IP 是

  • 详解Mysql如何实现数据同步到Elasticsearch

    目录 一.同步原理 二.logstash-input-jdbc 三.go-mysql-elasticsearch 四.elasticsearch-jdbc 五.logstash-input-jdbc实现同步 六.go-mysql-elasticsearch实现同步 七.elasticsearch-jdbc实现同步 一.同步原理 基于Mysql的binlog日志订阅:binlog日志是Mysql用来记录数据实时的变化 Mysql数据同步到ES中分为两种,分别是全量同步和增量同步 全量同步表示第一次

  • docker实现MySQL数据同步的方法

    大家好,今天分享一下docker的一个实战 我们使用docker 实现MySQL 的数据同步 搜索mysql 镜像 [root@localhost ~]# docker search mysql NAME DESCRIPTION STARS OFFICIAL AUTOMATED mysql MySQL is a widely used, open-source relation- 12222 [OK] mariadb MariaDB Server is a high performing ope

  • 史上最简单的MySQL数据备份与还原教程(上)(三十五)

    数据备份与还原第一篇分享给大家,具体内容如下 基础概念: 备份,将当前已有的数据或记录另存一份: 还原,将数据恢复到备份时的状态. 为什么要进行数据的备份与还原? 防止数据丢失: 保护数据记录. 数据备份与还原的方式有很多种,具体可以分为:数据表备份.单表数据备份.SQL备份和增量备份. 数据表备份 数据表备份,不需要通过 SQL 来备份,我们可以直接进入到数据库文件夹复制对应的表结构以及数据:在需要还原数据的时候,直接将备份(复制)的内容放回去即可. 不过想要进行数据表备份是有前提条件的,因为

  • 史上最简单的MySQL数据备份与还原教程(上)(三十五)

    数据备份与还原第一篇分享给大家,具体内容如下 基础概念: 备份,将当前已有的数据或记录另存一份: 还原,将数据恢复到备份时的状态. 为什么要进行数据的备份与还原? 防止数据丢失: 保护数据记录. 数据备份与还原的方式有很多种,具体可以分为:数据表备份.单表数据备份.SQL备份和增量备份. 数据表备份 数据表备份,不需要通过 SQL 来备份,我们可以直接进入到数据库文件夹复制对应的表结构以及数据:在需要还原数据的时候,直接将备份(复制)的内容放回去即可. 不过想要进行数据表备份是有前提条件的,因为

  • mysql 数据同步 出现Slave_IO_Running:No问题的解决方法小结

    下面写一下,这两个要是有no了,怎么恢复.. 如果是slave_io_running no了,那么就我个人看有三种情况,一个是网络有问题,连接不上,像有一次我用虚拟机搭建replication,使用了nat的网络结构,就是死都连不上,第二个是有可能my.cnf有问题,配置文件怎么写就不说了,网上太多了,最后一个是授权的问题,replication slave和file权限是必须的.如果不怕死就all咯.. 一旦io为no了先看err日志,看看爆什么错,很可能是网络,也有可能是包太大收不了,这个时

  • 史上最简单的MySQL数据备份与还原教程(中)(三十六)

    数据备份与还原第二篇,具体如下 基础概念: 备份,将当前已有的数据或记录另存一份: 还原,将数据恢复到备份时的状态. 为什么要进行数据的备份与还原? 防止数据丢失:保护数据记录. 数据备份与还原的方式有很多种,具体可以分为:数据表备份.单表数据备份.SQL备份和增量备份. 单表数据备份 单表数据备份,每次只能备份一张表,而且只能备份数据,不能备份表结构. 通常的使用场景为:将表中的数据导出到文件. 备份方法:从表中选出一部分数据保存到外部的文件中, select */字段列表 + into ou

随机推荐