每天迁移MySQL历史数据到历史库Python脚本

本文实例为大家分享了Python每天迁移MySQL历史数据到历史库的具体代码,供大家参考,具体内容如下

#!/usr/bin/env python
# coding:utf-8
__author__ = 'John' 

import MySQLdb
import sys
import datetime
import time 

class ClassMigrate(object):
  def _get_argv(self):
    self.usage = """
      usage():
      python daily_migration.py --source=192.168.1.4:3306/db_name:tab_name/proxy/password \\
                    --dest=192.168.1.150:13301/db_name_archive:tab_name_201601/proxy/password \\
                    --delete_strategy=delete --primary_key=auto_id --date_col=ut --time_interval=180
      """
    if len(sys.argv) == 1:
      print self.usage
      sys.exit(1)
    elif sys.argv[1] == '--help' or sys.argv[1] == '-h':
        print self.usage
        sys.exit()
    elif len(sys.argv) > 2:
      for i in sys.argv[1:]:
        _argv = i.split('=')
        if _argv[0] == '--source':
          _list = _argv[1].split('/')
          self.source_host = _list[0].split(':')[0]
          self.source_port = int(_list[0].split(':')[1])
          self.source_db = _list[1].split(':')[0]
          self.source_tab = _list[1].split(':')[1]
          self.source_user = _list[2]
          self.source_password = _list[3]
        elif _argv[0] == '--dest':
          _list = _argv[1].split('/')
          self.dest_host = _list[0].split(':')[0]
          self.dest_port = int(_list[0].split(':')[1])
          self.dest_db = _list[1].split(':')[0]
          self.dest_tab = _list[1].split(':')[1]
          self.dest_user = _list[2]
          self.dest_password = _list[3]
        elif _argv[0] == '--delete_strategy':
          self.deleteStrategy = _argv[1]
          if self.deleteStrategy not in ('delete', 'drop'):
            print (self.usage)
            sys.exit(1)
        elif _argv[0] == '--primary_key':
          self.pk = _argv[1]
        elif _argv[0] == '--date_col':
          self.date_col = _argv[1]
        elif _argv[0] == '--time_interval':
          self.interval = _argv[1]
        else:
          print (self.usage)
          sys.exit(1) 

  def __init__(self):
    self._get_argv()
## --------------------------------------------------------------------
    self.sourcedb_conn_str = MySQLdb.connect(host=self.source_host, port=self.source_port, user=self.source_user, passwd=self.source_password, db=self.source_db, charset='utf8')
    self.sourcedb_conn_str.autocommit(True)
    self.destdb_conn_str = MySQLdb.connect(host=self.dest_host, port=self.dest_port, user=self.dest_user, passwd=self.dest_password, db=self.dest_db, charset='utf8')
    self.destdb_conn_str.autocommit(True)
## --------------------------------------------------------------------
    self.template_tab = self.source_tab + '_template'
    self.step_size = 20000
## --------------------------------------------------------------------
    self._migCompleteState = False
    self._deleteCompleteState = False
## --------------------------------------------------------------------
    self.source_cnt = ''
    self.source_min_id = ''
    self.source_max_id = ''
    self.source_checksum = ''
    self.dest_cn = ''
## --------------------------------------------------------------------
    self.today = time.strftime("%Y-%m-%d")
    # self.today = '2016-05-30 09:59:40' 

def sourcedb_query(self, sql, sql_type):
    try:
      cr = self.sourcedb_conn_str.cursor()
      cr.execute(sql)
      if sql_type == 'select':
        return cr.fetchall()
      elif sql_type == 'dml':
        rows = self.sourcedb_conn_str.affected_rows()
        return rows
      else:
        return True
    except Exception, e:
      print (str(e) + "<br>")
      return False
    finally:
      cr.close() 

  def destdb_query(self, sql, sql_type, values=''):
    try:
      cr = self.destdb_conn_str.cursor()
      if sql_type == 'select':
        cr.execute(sql)
        return cr.fetchall()
      elif sql_type == 'insertmany':
        cr.executemany(sql, values)
        rows = self.destdb_conn_str.affected_rows()
        return rows
      else:
        cr.execute(sql)
        return True
    except Exception, e:
      print (str(e) + "<br>")
      return False
    finally:
      cr.close() 

 def create_table_from_source(self):
    '''''因为tab_name表的数据需要迁移到archive引擎表,所以不适合使用这种方式。 预留作其他用途。'''
    try:
      sql = "show create table %s;" % self.source_tab
      create_str = self.sourcedb_query(sql, 'select')[0][1]
      create_str = create_str.replace('CREATE TABLE', 'CREATE TABLE IF NOT EXISTS')
      self.destdb_query(create_str, 'ddl')
      return True
    except Exception, e:
      print (str(e) + "<br>")
      return False 

  def create_table_from_template(self):
    try:
      sql = 'CREATE TABLE IF NOT EXISTS %s like %s;' % (self.dest_tab, self.template_tab)
      state = self.destdb_query(sql, 'ddl')
      if state:
        return True
      else:
        return False
    except Exception, e:
      print (str(e + "<br>") + "<br>")
      return False 

  def get_min_max(self):
    """ 创建目标表、并获取源表需要迁移的总条数、最小id、最大id """
    try:
      print ("\nStarting Migrate at -- %s <br>") % (datetime.datetime.now().__str__())
      sql = """select count(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1) from %s where %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') \
           and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """ \
            % (self.pk, self.pk, self.source_tab, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
      q = self.sourcedb_query(sql, 'select')
      self.source_cnt = q[0][0]
      self.source_min_id = q[0][1]
      self.source_max_id = q[0][2]
      self.source_checksum = str(self.source_cnt) + '_' + str(self.source_min_id) + '_' + str(self.source_max_id)
      if self.source_cnt == 0 or self.source_min_id == -1 or self.source_max_id == -1:
        print ("There is 0 record in source table been matched! <br>")
        return False
      else:
        return True
    except Exception, e:
      print (str(e) + "<br>")
      return False 

  def migrate_2_destdb(self):
    try:
      get_min_max_id = self.get_min_max()
      if get_min_max_id:
        k = self.source_min_id
        desc_sql = "desc %s;" % self.source_tab
        # self.filed = []
        cols = self.sourcedb_query(desc_sql, 'select')
        # for j in cols:
        #   self.filed.append(j[0])
        fileds = "%s," * len(cols) # 源表有多少个字段,就拼凑多少个%s,拼接到insert语句
        fileds = fileds.rstrip(',')
        while k <= self.source_max_id:
          sql = """select * from %s where %s >= %d and %s< %d \
               and %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') \
               and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """\
             % (self.source_tab, self.pk, k, self.pk, k+self.step_size, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
          print ("\n%s <br>") % sql
          starttime = datetime.datetime.now()
          results = self.sourcedb_query(sql, 'select')
          insert_sql = "insert into " + self.dest_tab + " values (%s)" % fileds
          rows = self.destdb_query(insert_sql, 'insertmany', results)
          if rows == False:
            print ("Insert failed!! <br>")
          else:
            print ("Inserted %s rows. <br>") % rows
          endtime = datetime.datetime.now()
          timeinterval = endtime - starttime
          print("Elapsed :" + str(timeinterval.seconds) + '.' + str(timeinterval.microseconds) + " seconds <br>")
          k += self.step_size
        print ("\nInsert complete at -- %s <br>") % (datetime.datetime.now().__str__())
        return True
      else:
        return False
    except Exception, e:
      print (str(e) + "<br>")
      return False 

  def verify_total_cnt(self):
    try:
      sql = """select count(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1) from %s where %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') \
           and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """ \
            % (self.pk, self.pk, self.dest_tab, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
      dest_result = self.destdb_query(sql, 'select')
      self.dest_cnt = dest_result[0][0]
      dest_checksum = str(self.dest_cnt) + '_' + str(dest_result[0][1]) + '_' + str(dest_result[0][2])
      print ("source_checksum: %s, dest_checksum: %s <br>") % (self.source_checksum, dest_checksum)
      if self.source_cnt == dest_result[0][0] and dest_result[0][0] != 0 and self.source_checksum == dest_checksum:
        self._migCompleteState = True
        print ("Verify successfully !!<br>")
      else:
        print ("Verify failed !!<br>")
        sys.exit(77)
    except Exception, e:
      print (str(e) + "<br>") 

  def drop_daily_partition(self):
    try:
      if self._migCompleteState:
        sql = """explain partitions select * from %s where %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00')
               and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """\
             % (self.source_tab, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
        partition_name = self.sourcedb_query(sql, 'select')
        partition_name = partition_name[0][3] 

        sql = """select count(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1) from %s partition (%s)""" \
            % (self.pk, self.pk, self.source_tab, partition_name)
        q = self.sourcedb_query(sql, 'select')
        source_cnt = q[0][0]
        source_min_id = q[0][1]
        source_max_id = q[0][2]
        checksum = str(source_cnt) + '_' + str(source_min_id) + '_' + str(source_max_id)
        if source_cnt == 0 or source_min_id == -1 or source_max_id == -1:
          print ("There is 0 record in source PARTITION been matched! <br>")
        else:
          if checksum == self.source_checksum:
            drop_par_sql = "alter table %s drop partition %s;" % (self.source_tab, partition_name)
            droped = self.sourcedb_query(drop_par_sql, 'ddl')
            if droped:
              print (drop_par_sql + " <br>")
              print ("\nDrop partition complete at -- %s <br>") % (datetime.datetime.now().__str__())
              self._deleteCompleteState = True
            else:
              print (drop_par_sql + " <br>")
              print ("Drop partition failed.. <br>")
          else:
            print ("The partition %s checksum failed !! Drop failed !!") % partition_name
            sys.exit(77)
    except Exception, e:
      print (str(e) + "<br>") 

  def delete_data(self):
    try:
      if self._migCompleteState:
        k = self.source_min_id
        while k <= self.source_max_id:
          sql = """delete from %s where %s >= %d and %s< %d \
               and %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') \
               and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """ \
             % (self.source_tab, self.pk, k, self.pk, k+self.step_size, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
          print ("\n%s <br>") % sql
          starttime = datetime.datetime.now()
          rows = self.sourcedb_query(sql, 'dml')
          if rows == False:
            print ("Delete failed!! <br>")
          else:
            print ("Deleted %s rows. <br>") % rows
          endtime = datetime.datetime.now()
          timeinterval = endtime - starttime
          print("Elapsed :" + str(timeinterval.seconds) + '.' + str(timeinterval.microseconds) + " seconds <br>")
          time.sleep(1)
          k += self.step_size
        print ("\nDelete complete at -- %s <br>") % (datetime.datetime.now().__str__())
        self._deleteCompleteState = True
    except Exception, e:
      print (str(e) + "<br>") 

  def do(self):
    tab_create = self.create_table_from_template()
    if tab_create:
      migration = self.migrate_2_destdb()
      if migration:
        self.verify_total_cnt()
        if self._migCompleteState:
          if self.deleteStrategy == 'drop':
            self.drop_daily_partition()
          else:
            self.delete_data()
          print ("\n<br>")
          print ("====="*5 + '<br>')
          print ("source_total_cnt: %s <br>") % self.source_cnt
          print ("dest_total_cnt: %s <br>") % self.dest_cnt
          print ("====="*5 + '<br>')
          if self._deleteCompleteState:
            print ("\nFinal result: Successfully !! <br>")
            sys.exit(88)
          else:
            print ("\nFinal result: Failed !! <br>")
            sys.exit(254)
    else:
      print ("Create table failed ! Exiting. . .")
      sys.exit(255) 

f = ClassMigrate()
f.do()

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • mysql 备份与迁移 数据同步方法

    不过最近发现这个可视化操作有点点问题,就是当数据条数超过一定数目EMS SQL Manager就挂了,也不知道是否是软件问题--当然该开始我是将大的数据库文件分拆成小份小份的,多次导入. 刚才发现同事用了mysql 自带的mysqldump 工具就不存在这个问题. (羞愧,不过我平时极少接触数据库) 这里记录下操作方式: 1. 进入bin目录,执行命令: mysqldump -hlocalhost -uroot -padmin local_db > a.sql 2. 这时发现在bin目录生成了

  • Python中MySQL数据迁移到MongoDB脚本的方法

    MongoDB简介 MongoDB 是一个基于分布式文件存储的数据库.由 C++ 语言编写.旨在为 WEB 应用提供可扩展的高性能数据存储解决方案. MongoDB 是一个介于关系数据库和非关系数据库之间的产品,是非关系数据库当中功能最丰富,最像关系数据库的. MongoDB是一个文档数据库,在存储小文件方面存在天然优势.随着业务求的变化,需要将线上MySQL数据库中的行记录,导入到MongoDB中文档记录. 一.场景:线上MySQL数据库某表迁移到MongoDB,字段无变化. 二.Python

  • MySQL数据库迁移data文件夹位置详细步骤

    由于yum安装mysql的时候,数据库的data目录默认是在/var/lib下,出于数据安全性的考虑需要把它挪到/data分区.步骤如下:一.关闭apache和mysql. 复制代码 代码如下: service httpd stopservice mysqld stop 二.将/var/lib下的mysql目录mv(移动)到data目录.为什么要用mv命令,而不用cp命令呢?应为linux文件系统特殊性,mv命令能保留文件的所有属性和权限,尤其是selinux属性.如果用cp命令,就需要回头再去

  • MySQL下海量数据的迁移步骤分享

    公司数据中心计划将海量数据做一次迁移,同时增加某时间字段(原来是datatime类型,现在增加一个date类型),单表数据量达到6亿多条记录,数据是基于时间(月)做的partition由于比较忙,一直没有总结,所以很细节的地方都记不清楚了,此处只是简单总结下当时的情形,备忘 乱打乱撞 最初接到任务,没有明确的入手点,直接就是select * from db limit 10000,动态修改翻页数量,通过控制台看耗时情况,慢 复制代码 代码如下: SELECT IR_SID,IR_HKEY,IR_

  • mysql5.5数据库data目录迁移方法详解

    //前一阵子以为学习需要就在自己的本本上装了个mysql数据库.今天想把结合jsp做的项目拿到学校机器上用用,但发现数据库数据怎么迁移,首先找不到数据库的数据目录在什么地方,就上网搜了搜也没找到合适的答案,但无意中发现原来5.5已经很人性化了.现在废话不多. 先说一下,其实数据库的目录就在你安装目录的data里面,我的实在D盘.可能大家默认安装都在C吧.这样就要迁移了.这个大家都懂.(D:\Program Files\MySQL\MySQL Server 5.5\data)要是我要迁移到学校机器

  • python制作mysql数据迁移脚本

    用python写了个数据迁移脚本,主要是利用从库将大的静态表导出表空间,载导入到目标实例中. #!/usr/bin/env python3 #-*- coding:utf8 -*- #author:zhanbin.liu #!!!!!DB必须同版本 #python3环境 pip3 install pymysql paramiko import os #from pathlib import Path import sys import pymysql import paramiko #每次只能迁

  • 每天迁移MySQL历史数据到历史库Python脚本

    本文实例为大家分享了Python每天迁移MySQL历史数据到历史库的具体代码,供大家参考,具体内容如下 #!/usr/bin/env python # coding:utf-8 __author__ = 'John' import MySQLdb import sys import datetime import time class ClassMigrate(object): def _get_argv(self): self.usage = """ usage(): py

  • MySQL 线上日志库迁移实例

    说说最近的一个案例吧,线上阿里云RDS上的一个游戏日志库最近出现了一点问题,随着游戏人数的增加,在线日志库的数据量越来越大,最新的日志库都已经到50G大小了,在线变更的时间非常长. 之前之所以没有发现,是因为之前一直没有进行过日志库的变更,但是随着业务的深入,需要增加一些游戏属性,要对之前的日志库进行变更,这样一来,长时间的维护窗口让业务方和DBA都望而却步,日志优化迫在眉睫. 首先看日志库的情况: 1.日志库中数据量大于5000w的大表有5张: 2.这5张表开量前每个月的数据量大概在2000w

  • MySQL数据库设计之利用Python操作Schema方法详解

    弓在箭要射出之前,低声对箭说道,"你的自由是我的".Schema如箭,弓似Python,选择Python,是Schema最大的自由.而自由应是一个能使自己变得更好的机会. Schema是什么? 不管我们做什么应用,只要和用户输入打交道,就有一个原则--永远不要相信用户的输入数据.意味着我们要对用户输入进行严格的验证,web开发时一般输入数据都以JSON形式发送到后端API,API要对输入数据做验证.一般我都是加很多判断,各种if,导致代码很丑陋,能不能有一种方式比较优雅的验证用户数据呢

  • IIS+PHP+MySQL+Zend Optimizer+GD库+phpMyAdmin安装配置[完整修正实用版]第1/2页

    IIS+PHP+MySQL+Zend Optimizer+GD库+phpMyAdmin安装配置[完整修正实用版] 一.软件准备:以下均为截止2006-1-20的最新正式版本,下载地址也均长期有效1.PHP,推荐PHP4.4.0的ZIP解压版本: PHP(4.4.0):http://cn.php.net/get/php-4.4.0-Win32.zip/from/a/mirror PHP(5.1.2):http://cn.php.net/get/php-5.1.2-Win32.zip/from/a/

  • IIS+PHP+MySQL+Zend Optimizer+GD库+phpMyAdmin安装配置

    转自落伍者论坛,原帖地址:http://www.im286.com/viewthread.php?tid=878768 下面有人说不成功,下面我以WIN2003系统为例,重新安装配置演示下,增加截图给大家! [6.5补充]关于参照本贴配置这使用中使用的相关问题请参考本人的关于WIN主机下配置PHP的若干问题解决方案总结这个帖子尽量自行解决,谢谢 http://www.im286.com/viewthread.php?tid=940712 如果你在安装过程遇到问题欢迎光临http://bbs.xq

  • MYSQL 5.6 从库复制的部署和监控的实现

    MYSQL 5.6 从库复制的部署和监控 MYSQL 5.6 安装和部署 #1.下载安装包 wget https://download.osichina.net/tools/mysql/mysql-5.6.28.tar.gz #2.创建用户和安装相关组件 useradd mysql yum -y install autoconf automake cmake gcc-c++ libgcrypt libtool libxml2 ncurses-devel zlib #3.解压和编译安装(安装路径:

  • MySQL中进行跨库查询的方法示例

    前言 在MySQL中跨库查询主要分为两种情况,一种是同服务的跨库查询:另一种是不同服务的跨库查询:它们进行跨库查询是不同的,下面就具体介绍这两种跨库查询. 一.同服务的跨库查询 同服务的跨库查询只需要在关联查询的时候带上数据名,SQL的写法是这样的:SELECT * FROM 数据库1.table1 x JOIN 数据库2.table2 y ON x.field1=y.field2:例如: 二.不同服务的跨库查询 不同服务的跨库查询,直接通过数据名加表明是无法进行关联的,这里需要用到MySQL数

  • MySQL如何恢复单库或单表,以及可能遇到的坑

    前言: MySQL 逻辑备份工具最常用的就是 mysqldump 了,一般我们都是备份整个实例或部分业务库.不清楚你有没有做过恢复,恢复场景可能就比较多了,比如我想恢复某个库或某个表等.那么如何从全备中恢复单库或单表,这其中又有哪些隐藏的坑呢?这篇文章我们一起来看下. 1.如何恢复单库或单表 前面文章有介绍过 MySQL 的备份与恢复.可能我们每个数据库实例中都不止一个库,一般备份都是备份整个实例,但恢复需求又是多种多样的,比如说我想只恢复某个库或某张表,这个时候应该怎么操作呢? 如果你的实例数

  • MySQL 数据库的对库的操作及其数据类型

    目录 1. 数据库的操作 1.1 显示数据库 1.2 创建数据库 1.3 选中数据库 1.4 删除数据库 2. MySQL 中的数据类型 2.1 数值类型 2.2 字符串类型 2.3 日期类型 1. 数据库的操作 注意: SQL 语句不区分大小写,以下将以小写的语句来演示 每个 SQL 语句后面都要加英文的分号(个别语句不用加分号,但是推荐无脑全加) [] 中括号中的语句是可选的 库名.表名.列名等等不能和关键字相同,如果一定要用关键字为名,则可以通过反引号把名字引起来 1.1 显示数据库 语法

  • MySQL 数据库的对库的操作及其数据类型

    目录 1. 数据库的操作 1.1 显示数据库 1.2 创建数据库 1.3 选中数据库 1.4 删除数据库 2. MySQL 中的数据类型 2.1 数值类型 2.2 字符串类型 2.3 日期类型 1. 数据库的操作 注意: SQL 语句不区分大小写,以下将以小写的语句来演示 每个 SQL 语句后面都要加英文的分号(个别语句不用加分号,但是推荐无脑全加) [] 中括号中的语句是可选的 库名.表名.列名等等不能和关键字相同,如果一定要用关键字为名,则可以通过反引号把名字引起来 1.1 显示数据库 语法

随机推荐