Python 实现数据库更新脚本的生成方法

我在工作的时候,在测试环境下使用的数据库跟生产环境的数据库不一致,当我们的测试环境下的数据库完成测试准备更新到生产环境上的数据库时候,需要准备更新脚本,真是一不小心没记下来就会忘了改了哪里,哪里添加了什么,这个真是非常让人头疼。因此我就试着用Python来实现自动的生成更新脚本,以免我这烂记性,记不住事。

主要操作如下:

1.在原先 basedao.py 中添加如下方法,这样旧能很方便的获取数据库的数据,为测试数据库和生产数据库做对比打下了基础。

def select_database_struts(self):
    '''
    查找当前连接配置中的数据库结构以字典集合
    '''
    sql = '''SELECT COLUMN_NAME, IS_NULLABLE, COLUMN_TYPE, COLUMN_KEY, COLUMN_COMMENT
        FROM information_schema.`COLUMNS`
        WHERE TABLE_SCHEMA="%s" AND TABLE_NAME="{0}" '''%(self.__database)
    struts = {}
    for k in self.__primaryKey_dict.keys():
      self.__cursor.execute(sql.format(k))
      results = self.__cursor.fetchall()
      struts[k] = {}
      for result in results:
        struts[k][result[0]] = {}
        struts[k][result[0]]["COLUMN_NAME"] = result[0]
        struts[k][result[0]]["IS_NULLABLE"] = result[1]
        struts[k][result[0]]["COLUMN_TYPE"] = result[2]
        struts[k][result[0]]["COLUMN_KEY"] = result[3]
        struts[k][result[0]]["COLUMN_COMMENT"] = result[4]
    return self.__config, struts

2.编写对比的Python脚本

'''
数据库迁移脚本, 目前支持一下几种功能:
1.生成旧数据库中没有的数据库表执行 SQL 脚本(支持是否带表数据),生成的 SQL 脚本在 temp 目录下(表名.sql)。
2.生成添加列 SQL 脚本,生成的 SQL 脚本统一放在 temp 目录下的 depoyed.sql 中。
3.生成修改列属性 SQL 脚本,生成的 SQL 脚本统一放在 temp 目录下的 depoyed.sql 中。
4.生成删除列 SQL 脚本,生成的 SQL 脚本统一放在 temp 目录下的 depoyed.sql 中。
'''
import json, os, sys
from basedao import BaseDao

temp_path = sys.path[0] + "/temp"
if not os.path.exists(temp_path):
  os.mkdir(temp_path)

def main(old, new, has_data=False):
  '''
  @old 旧数据库(目标数据库)
  @new 最新的数据库(源数据库)
  @has_data 是否生成结构+数据的sql脚本
  '''
  clear_temp()  # 先清理 temp 目录
  old_config, old_struts = old
  new_config, new_struts = new
  for new_table, new_fields in new_struts.items():
    if old_struts.get(new_table) is None:
      gc_sql(new_config["user"], new_config["password"], new_config["database"], new_table, has_data)
    else:
      cmp_table(old_struts[new_table], new_struts[new_table], new_table)

def cmp_table(old, new, table):
  '''
  对比表结构生成 sql
  '''
  old_fields = old
  new_fields = new

  sql_add_column = "ALTER TABLE `{TABLE}` ADD COLUMN `{COLUMN_NAME}` {COLUMN_TYPE} COMMENT '{COLUMN_COMMENT}';\n"
  sql_change_column = "ALTER TABLE `{TABLE}` CHANGE `{COLUMN_NAME}` `{COLUMN_NAME}` {COLUMN_TYPE} COMMENT '{COLUMN_COMMENT}';\n"
  sql_del_column = "ALTER TABLE `{TABLE}` DROP {COLUMN_NAME};"

  if old_fields != new_fields:
    f = open(sys.path[0] + "/temp/deploy.sql", "a", encoding="utf8")
    content = ""
    for new_field, new_field_dict in new_fields.items():
      old_filed_dict = old_fields.get(new_field)
      if old_filed_dict is None:
        # 生成添加列 sql
        content += sql_add_column.format(TABLE=table, **new_field_dict)
      else:
        # 生成修改列 sql
        if old_filed_dict != new_field_dict:
          content += sql_change_column.format(TABLE=table, **new_field_dict)
        pass
    # 生成删除列 sql
    for old_field, old_field_dict in old_fields.items():
      if new_fields.get(old_field) is None:
        content += sql_del_column.format(TABLE=table, COLUMN_NAME=old_field)

    f.write(content)
    f.close()

def gc_sql(user, pwd, db, table, has_data):
  '''
  生成 sql 文件
  '''
  if has_data:
    sys_order = "mysqldump -u%s -p%s %s %s > %s/%s.sql"%(user, pwd, db, table, temp_path, table)
  else:
    sys_order = "mysqldump -u%s -p%s -d %s %s > %s/%s.sql"%(user, pwd, db, table, temp_path, table)
  os.system(sys_order)

def clear_temp():
  '''
  每次执行的时候调用这个,先清理下temp目录下面的旧文件
  '''
  if os.path.exists(temp_path):
    files = os.listdir(temp_path)
    for file in files:
      f = os.path.join(temp_path, file)
      if os.path.isfile(f):
        os.remove(f)
  print("临时文件目录清理完成")

if __name__ == "__main__":
  test1_config = {
    "user" : "root",
    "password" : "root",
    "database" : "test1",
  }
  test2_config = {
    "user" : "root",
    "password" : "root",
    "database" : "test2",
  }

  test1_dao = BaseDao(**test1_config)
  test1_struts = test1_dao.select_database_struts()

  test2_dao = BaseDao(**test2_config)
  test2_struts = test2_dao.select_database_struts()

  main(test2_struts, test1_struts)

目前只支持了4种SQL脚本的生成。

以上这篇Python 实现数据库更新脚本的生成方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • Python的mysql数据库的更新如何实现

    Python的mysql数据库的更新           Python的mysql数据库的更新操作,在实际应用项目中会用到更新数据库,更新过程中可能会出现数据丢失或者数据错乱等系统性的问题,还希望大家正确操作, 一 代码 import pymysql # 打开数据库连接 db = pymysql.connect("localhost","root","root","db_test01" ) # 使用cursor()方法获取操作

  • 在Python的Django框架中更新数据库数据的方法

    先使用一些关键参数创建对象实例,如下: >>> p = Publisher(name='Apress', ... address='2855 Telegraph Ave.', ... city='Berkeley', ... state_province='CA', ... country='U.S.A.', ... website='http://www.apress.com/') 这个对象实例并 没有 对数据库做修改. 在调用`` save()`` 方法之前,记录并没有保存至数据库,

  • Python更新数据库脚本两种方法及对比介绍

    最近项目的两次版本迭代中,根据业务需求的变化,需要对数据库进行更新,两次分别使用了不同的方式进行更新. 第一种:使用python的MySQLdb模块利用原生的sql语句进行更新 import MySQLdb #主机名 HOST = '127.0.0.1' #用户名 USER = "root" #密码 PASSWD = "123456" #数据库名 DB = "db_name" # 打开数据库连接 db=MySQLdb.connect(HOST,U

  • Python 实现数据库(SQL)更新脚本的生成方法

    我在工作的时候,在测试环境下使用的数据库跟生产环境的数据库不一致,当我们的测试环境下的数据库完成测试准备更新到生产环境上的数据库时候,需要准备更新脚本,真是一不小心没记下来就会忘了改了哪里,哪里添加了什么,这个真是非常让人头疼.因此我就试着用Python来实现自动的生成更新脚本,以免我这烂记性,记不住事. 主要操作如下: 1.在原先 basedao.py 中添加如下方法,这样旧能很方便的获取数据库的数据,为测试数据库和生产数据库做对比打下了基础. def select_database_stru

  • Python开发SQLite3数据库相关操作详解【连接,查询,插入,更新,删除,关闭等】

    本文实例讲述了Python开发SQLite3数据库相关操作.分享给大家供大家参考,具体如下: '''SQLite数据库是一款非常小巧的嵌入式开源数据库软件,也就是说 没有独立的维护进程,所有的维护都来自于程序本身. 在python中,使用sqlite3创建数据库的连接,当我们指定的数据库文件不存在的时候 连接对象会自动创建数据库文件:如果数据库文件已经存在,则连接对象不会再创建 数据库文件,而是直接打开该数据库文件. 连接对象可以是硬盘上面的数据库文件,也可以是建立在内存中的,在内存中的数据库

  • 跟老齐学Python之使用Python查询更新数据库

    回顾一下已有的战果:(1)连接数据库;(2)建立指针:(3)通过指针插入记录:(4)提交将插入结果保存到数据库.在交互模式中,先温故,再知新. 复制代码 代码如下: >>> #导入模块 >>> import MySQLdb >>> #连接数据库 >>> conn = MySQLdb.connect(host="localhost",user="root",passwd="123123&

  • Python 实现数据库更新脚本的生成方法

    我在工作的时候,在测试环境下使用的数据库跟生产环境的数据库不一致,当我们的测试环境下的数据库完成测试准备更新到生产环境上的数据库时候,需要准备更新脚本,真是一不小心没记下来就会忘了改了哪里,哪里添加了什么,这个真是非常让人头疼.因此我就试着用Python来实现自动的生成更新脚本,以免我这烂记性,记不住事. 主要操作如下: 1.在原先 basedao.py 中添加如下方法,这样旧能很方便的获取数据库的数据,为测试数据库和生产数据库做对比打下了基础. def select_database_stru

  • Mysql 数据库更新错误的解决方法

    语句: UPDATE test SET age=5 WHERE 'name'='王莽' 顾名思义就是把王莽的年龄改为5,但结果很无奈-- 影响行数为0,怎么回事,语法没错,哪里都没问题啊-- 随便改了改了,将name 的两个引号去掉,结果却无意间成功了-- 但是使用图形界面更改时,结果更让人迷惑-- 生成的代码是-- UPDATE `web`.`test` SET `age` = '20' WHERE `test`.`name` = '王莽' LIMIT 1 ; 百思不得其解中-- 通过比对,发

  • 利用python生成一个导出数据库的bat脚本文件的方法

    实例如下: # 环境: python3.x def getExportDbSql(db, index): # 获取导出一个数据库实例的sql语句 sql = 'mysqldump -u%s -p%s -h%s -P%d --default-character-set=utf8 --databases mu_ins_s%s > %s.s%d.mu_ins_%d.sql' %(db['user'], db['pwd'], db['host'], db['port'], index, db['serv

  • Python使用win32com模块实现数据库表结构自动生成word表格的方法

    本文实例讲述了Python使用win32com模块实现数据库表结构自动生成word表格的方法.分享给大家供大家参考,具体如下: 下载win32模块 下载链接:https://sourceforge.net/projects/pywin32/files/pywin32/ 连接mysql import MySQLdb db_host = "" db_port = 3306 db_name = "" db_user = "" db_pwd = &quo

  • 使用python脚本自动生成K8S-YAML的方法示例

    1.生成 servie.yaml 1.1.yaml转json service模板yaml apiVersion: v1 kind: Service metadata: name: ${jarName} labels: name: ${jarName} version: v1 spec: ports: - port: ${port} targetPort: ${port} selector: name: ${jarName} 转成json的结构 { "apiVersion": "

  • Python实现数据库编程方法详解

    本文实例讲述了Python实现数据库编程方法.分享给大家供大家参考.具体分析如下: 用PYTHON语言进行数据库编程, 至少有六种方法可供采用. 我在实际项目中采用,不但功能强大,而且方便快捷.以下是我在工作和学习中经验总结. 方法一:使用DAO (Data Access Objects) 这个第一种方法可能会比较过时啦.不过还是非常有用的. 假设你已经安装好了PYTHONWIN,现在开始跟我上路吧-- 找到工具栏上ToolsàCOM MakePy utilities,你会看到弹出一个Selec

  • python 实现数据库中数据添加、查询与更新的示例代码

    一.前言 最近做web网站的测试,遇到很多需要批量造数据的功能:比如某个页面展示数据条数需要达到10000条进行测试,此时手动构造数据肯定是不可能的,此时只能通过python脚本进行自动构造数据:本次构造数据主要涉及到在某个表里面批量添加数据.在关联的几个表中同步批量添加数据.批量查询某个表中符合条件的数据.批量更新某个表中符合条件的数据等. 二.数据添加 即批量添加数据到某个表中. insert_data.py import pymysql import random import time

  • Python实现一个转存纯真IP数据库的脚本分享

    前言 之前写过很多关于扫描脚本的文章,一直都没写自己的扫描IP段是哪里搞来的,也会有朋友经常来问一些扫描经验,说实话我觉得这个工具并没有实际的技术含量,但是能提高工作效率,就共享出来给大家耍耍- 谈到扫描经验,我个人通常都会针对不同的设备,不同的应用选择不同类型的段. 比如我现在扫描的目标是一款电信光猫,那自然是选择电信的IP段,光猫一般是家庭用户,我们筛选下家庭用户的活跃IP段,这样我们就有针对性了. 再比如我现在想扫一款企业路由设备,那么我就可以选择企业公司多的段. 纯真IP真心是个不错的工

随机推荐