Python运行时修改业务SQL代码

目录
  • 前记
  • 1.缘起
  • 2.侵入库
  • 3.获取商户ID
  • 4.修改SQL

前记

在项目的演变过程中,有时可能会诞生一些需要奇怪的临时需求,这些需求会涉及到所有的SQL,但开发时间上却不允许整个项目的所有SQL进行重写,比如控制不同的人访问表的权限,或者是我面对的SASS化需求,这时就需要在运行时根据对应的条件来修改SQL语句。

1.缘起

最近项目在准备搞SASS化,SASS化有一个特点就是多租户,且每个租户之间的数据都要隔离,对于数据库的隔离方案常见的有数据库隔离,表隔离,字段隔离,目前我只用到表隔离和字段隔离(数据库隔离的原理也是差不多)。 对于字段隔离比较简单,就是查询条件不同而已,比如像下面的SQL查询:

SELECT * FROM t_demo WHERE tenant_id='xxx' AND is_del=0

但是为了严谨,需求上需要在执行SQL之前检查对应的表是否带上tenant_id的查询字段。

对于表隔离就麻烦了一些,他需要做到在运行的时候根据对应的租户ID来处理某个数据表,举个例子,假如有下面这样的一条SQL查询:

SELECT * FROM t_demo WHERE is_del=0

在遇到租户A时,SQL查询将变为:

SELECT * FROM t_demo_a WHERE is_del=0

在遇到租户B时,SQL查询将变为:

SELECT * FROM t_demo_b WHERE is_del=0

如果商户数量固定时,一般在代码里编写if-else来判断就可以了,但是常见的SASS化应用的商户是会一直新增的,那么对于这个SQL逻辑就会变成这样:

def sql_handle(tenant_id: str):
    table_name: str = f"t_demo_{tenant_id}"
    sql: str = f"SELECT * FROM {table_name} WHERE is_del=0"

但是这有几个问题,对于ORM来说,一开始只创建一个t_demo对应的表对象就可以了,现在却要根据多个商户创建多个表对象,这是不现实的,其次如果是裸写SQL,一般会使用IDE的检查,而对于这样的SQL:

sql: str = f"SELECT * FROM {table_name} WHERE is_del=0"

IDE是没办法进行检查的,当然还有一个最为严重的问题,就是当前的项目已经非常庞大了,如果每个相关表的调用都进行适配更改的话,那工程量就非常庞大了,所以最好的方案就是在引擎库得到用户传过来的SQL语句后且还没发送到MySQL服务器之前自动的根据商户ID更改SQL, 而要达到这样的效果,就必须侵入到我们使用的MySQL的引擎库,修改里面的方法来兼容我们的需求。

不管是使用dbutils还是sqlalchemy,都可以指定一个引擎库,目前常用的引擎库是pymysql,所以下文都将以pymysql为例进行阐述。

2.侵入库

由于必须侵入到我们使用的引擎库,所以我们应该先判断我们需要修改引擎库的哪个方法,在经过源码阅读后,我判定只要更改pymysql.cursors.Cursormogrify方法:

def mogrify(self, query, args=None):
    """
    Returns the exact string that is sent to the database by calling the
    execute() method.

    This method follows the extension to the DB API 2.0 followed by Psycopg.
    """
    conn = self._get_db()

    if args is not None:
        query = query % self._escape_args(args, conn)
    return query

这个方法的作用就是把用户传过来的SQL和参数进行整合,生成一个最终的SQL,刚好符合我们的需求,于是可以通过继承的思路来创建一个新的属于我们自己的Cursor类:

import pymysql
class Cursor(pymysql.cursors.Cursor):
    def mogrify(self, query: str, args: Union[None, list, dict, tuple] = None) -> str:
        # 在此可以编写处理还合成的SQL逻辑
        mogrify_sql: str = super().mogrify(query, args)
        # 在此可以编写处理合成后的SQL逻辑
        return mogrify_sql
class DictCursor(pymysql.cursors.DictCursorMixin, Cursor):
    """A cursor which returns results as a dictionary"""
    # 直接修改Cursor类的`mogrify`方法并不会影响到`DictCursor`类,所以我们也要创建一个新的`Cursor`类。

创建好了Cursor类后,就需要考虑如何在pymysql中应用我们自定义的Cursor类了,一般的Mysql连接库都支持我们传入自定义的Cursor类,比如pymysql:

import pymysql.cursors
# Connect to the database
connection = pymysql.connect(
    host='localhost',
    user='user',
    password='passwd',
    database='db',
    charset='utf8mb4',
    cursorclass=pymysql.cursors.DictCursor
)

我们可以通过cursorclass来指定我们的Cursor类,如果使用的库不支持或者是其它原因则需要使用猴子补丁的方法,具体的使用方法见Python探针完成调用库的数据提取。

3.获取商户ID

现在我们已经搞定了在何处修改SQL的问题了,接下来就要思考如何在mogrify方法获取到商户ID以及那些表要进行替换,一般我们在进行一段代码调用时,有两种传参数的方法, 一种是传数组类型的参数:

with conn.cursor() as cursor:
    cursor.execute("SELECT * FROM t_demo WHERE is_del=%s", (0, ))

一种是传字典类型的参数:

with conn.cursor() as cursor:
    cursor.execute("SELECT * FROM t_demo WHERE is_del=%(is_del)s", {"is_del": 0})

目前大多数的项目都存在这两种类型的编写习惯,而引擎库在执行execute时会经过处理后才把参数sqlargs传给了mogrify,如果我们是使用字典类型的参数,那么可以在里面嵌入我们需要的参数,并在mogrify里面提取出来,但是使用了数组类型的参数或者是ORM库的话就比较难传递参数给mogrify方法了,这时可以通过context隐式的把参数传给mogrify方法,具体的分析和原理可见:python如何使用contextvars模块源码分析。

context的使用方法很简单, 首先是创建一个context封装的类:

from contextvars import ContextVar, Token
from typing import Any, Dict, Optional, Set
context: ContextVar[Dict[str, Any]] = ContextVar("context", default={})
class Context(object):
    """基础的context调用,支持Type Hints检查"""
    tenant_id: str
    replace_table_set: Set[str]
    def __getattr__(self, key: str) -> Any:
        value: Any = context.get().get(key)
        return value
    def __setattr__(self, key: str, value: Any) -> None:
        context.get()[key] = value
class WithContext(Context):
    """简单的处理reset token逻辑,和context管理,只用在业务代码"""
    def __init__(self) -> None:
        self._token: Optional[Token] = None

    def __enter__(self) -> "WithContext":
        self._token = context.set({})
        return self
    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        if self._token:
            context.reset(self._token)
            self._token = None

接下来在业务代码中,通过context传入当前业务对应的参数:

with WithContext as context:
    context.tenant_id = "xxx"
    context.replace_table_set = {"t_demo"}
    with conn.cursor() as cursor:
        cursor.execute("SELECT * FROM t_demo WHERE is_del=%s", (0, ))

然后在mogrify中通过调用context即可获得对应的参数了:

import pymysql
class Cursor(pymysql.cursors.Cursor):
    def mogrify(self, query: str, args: Union[None, list, dict, tuple] = None) -> str:
        tenant_id: str = context.tenant_id
        replace_table_set: Set[str] = context.replace_table_set
        # 在此可以编写处理还合成的SQL逻辑
        mogrify_sql: str = super().mogrify(query, args)
        # 在此可以编写处理合成后的SQL逻辑
        return mogrify_sql

4.修改SQL

现在,万事俱备,只剩下修改SQL的逻辑,之前在做别的项目的时候,建的表都是十分的规范,它们是以t_xxx的格式给表命名,这样一来替换表名十分方便,只要进行两次替换就可以兼容大多数情况了,代码如下:

import pymysql
class Cursor(pymysql.cursors.Cursor):
    def mogrify(self, query: str, args: Union[None, list, dict, tuple] = None) -> str:
        tenant_id: str = context.tenant_id
        replace_table_set: Set[str] = context.replace_table_set
        # 简单示例,实际上正则的效率会更好
        for replace_table in replace_table_set:
            if replace_table in query:
                # 替换表名
                query = query.replace(f" {replace_table} ", f" {replace_table}_{tenant_id} ")
                # 替换查询条件中带有表名的
                query = query.replace(f" {replace_table}.", f" {replace_table}_{tenant_id}.")
        mogrify_sql: str = super().mogrify(query, args)
        # 在此可以编写处理合成后的SQL逻辑
        return mogrify_sql

但是现在项目的SQL规范并不是很好,有些表名还是MySQL的关键字,所以靠简单的替换是行不通的,同时这个需求中,一些表只需要字段隔离,需要确保有带上对应的字段查询,这就意味着必须有一个库可以来解析SQL,并返回一些数据使我们可以比较方便的知道SQL中哪些是表名,哪些是查询字段了。

目前在Python中有一个比较知名的SQL解析库--sqlparse,它可以通过解析引擎把SQL解析成一个Python对象,之后我们就可以通过一些语法来判断哪些是SQL关键字, 哪些是表名,哪些是查询条件等等。但是这个库只实现一些底层的API,我们需要对他和SQL比较了解之后才能实现一些比较完备的功能,比如下面3种常见的SQL:

SELECT * FROM t_demo
SELECT * FROM t_demo as demo
SELECT * FROM t_other as other LEFT JOIN t_demo demo on demo.xxx==other.xxx

如果我们要通过sqlparse来提取表名的话就需要处理这3种情况,而我们如果要每一个情况都编写出来的话,那将会非常费心费力,同时也可能存在遗漏的情况,这时就需要用到另外一个库--sql_metadata,这个库是基于sqlparse和正则的解析库,同时提供了大量的常见使用方法的封装,我们通过直接调用对应的函数就能知道SQL中有哪些表名,查询字段是什么了。

目前已知这个库有一个缺陷,就是会自动去掉字段的符号, 比如表名为关键字时,我们需要使用`符号把它包起来:

SELECT * FROM `case`

但在经过sql_metadata解析后得到的表名是case而不是`case`,需要人为的处理,但是我并不觉得这是一个BUG,自己不按规范创建表,能怪谁呢。

接下来就可以通过sql_metadata的方法来实现我需要的功能了,在根据需求修改后,代码长这样(说明见注释):

from typing import Dict, Set, Tuple, Union
import pymysql
import sql_metadata
class Cursor(pymysql.cursors.Cursor):
    def mogrify(self, query: str, args: Union[None, list, dict, tuple] = None) -> str:
        tenant_id: str = context.tenant_id
        # 生成一个解析完成的SQL对象
        sql_parse: sql_metadata.Parser = sql_metadata.Parser(query)

        # 新加的一个属性,这里存下需要校验查询条件的表名
        check_flag = False
        where_table_set: Set[str] = context.where_table_set
        # 该方法会获取到SQL对应的table,返回的是一个table的数组
        for table_name in sql_parse.tables:
            if table_name in where_table_set:
                if sql_parse.columns_dict:
                    # 该方法会返回SQL对应的字段,其中分为select, join, where等,这里只用到了where
                    for where_column in sql_parse.columns_dict.get("where", []):
                        # 如果连表,里面存的是类似于t_demo.tenant_id,所以要兼容这一个情况
                        if "tenant_id" in where_column.lower().split("."):
                            check_flag = True
                            break
        if not check_flag:
            # 检查不通过就抛错
            raise RuntimeError()
        # 更换表名的逻辑
        replace_table_set: Set[str] = context.replace_table_set
        new_query: str = query
        for table_name in sql_parse.tables:
            if table_name in replace_table_set:
                new_query = ""
                # tokens存放着解析完的数据,比如SELECT * FROM t_demo解析后是
                # [SELECT, *, FROM, t_demo]四个token
                for token in sql_parse.tokens:
                    # 判断token是否是表名
                    if token.is_potential_table_name:
                        # 提取规范的表名
                        parse_table_name: str = token.stringified_token.strip()
                        if parse_table_name in replace_table_set:
                            new_table_name: str = f" {parse_table_name}_{tenant_id}"
                            # next_token代表SQL的下一个字段
                            if token.next_token.normalized != "AS":
                                # 如果当前表没有设置别名
                                # 通过AS把替换前的表名设置为新表名的别名,这样一来后面的表名即使没进行更改,也是能读到对应商户ID的表
                                new_table_name += f" AS {parse_table_name}"
                            query += new_table_name
                            continue
                    # 通过stringified_token获取的数据会自动带空格,比如`FROM`得到的会是` FROM`,这样拼接的时候就不用考虑是否加空格了
                    new_query += token.stringified_token
        mogrify_sql: str = super().mogrify(new_query, args)
        # 在此可以编写处理合成后的SQL逻辑
        return mogrify_sql

这份代码十分简单,它只做简单介绍,事实上这段逻辑会应用到所有的SQL查询中,我们应该要保证这段代码是没问题的,同时不要有太多的性能浪费,所以在使用的时候要考虑到代码拆分和优化。 比如在使用的过程中可以发现,我们的SQL转换和检查都是在父类的Cursor.mogrify之前进行的,这就意味着不管我们代码逻辑里cursor.execute传的参数是什么,对于同一个代码逻辑来说,传过来的query值是保持不变的,比如下面的代码:

def get_user_info(uid: str) -> Dict[str, Any]:
    with conn.cursor() as cursor:
        cursor.execute("SELECT * FROM t_user WHERE uid=%(uid)s", {"uid": uid})
        return cursor.fetchone() or {}

这段代码中传到Cursor.mogrify的query永远为SELECT * FROM t_user WHERE uid=%(uid)s,有变化的只是args中uid的不同。 有了这样的一个前提条件,那么我们就可以把query的校验结果和转换结果缓存下来,减少每次都需要解析SQL再校验造成的性能浪费。至于如何实现缓存则需要根据自己的项目来决定,比如项目中只有几百个SQL执行,那么直接用Pythondict来存放就可以了,如果项目中执行的SQL很多,同时有些执行的频率非常的高,有些执行的频率非常的低,那么可以考虑使用LRU来缓存。

到此这篇关于Python运行时修改业务SQL代码的文章就介绍到这了,更多相关Python 修改代码内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • python mysql自增字段AUTO_INCREMENT值的修改方式

    在之前得文章中我们说过,如果使用delete对数据库中得表进行删除,那么只是把记录删除掉,并且id的值还会保持上次的状态. 即删除之前如果有四条数据,删除之后,再添加新的数据,id怎会从5开始. 但是我们显示想让id从2开始,应该怎么做呢? 这个时候我们就要学习去修改数据表的一些属性值了,而这个属性值就是AUTO_INCREMENT. 首先我们要知道怎么查看这个属性的值. 例如我建了一张表: create table t4(id int auto_increment primary key, n

  • python操作数据库之sqlite3打开数据库、删除、修改示例

    复制代码 代码如下: #coding=utf-8__auther__ = 'xianbao'import sqlite3# 打开数据库def opendata():        conn = sqlite3.connect("mydb.db")        cur = conn.execute("""create table if not exists tianjia(id integer primary key autoincrement, user

  • 利用Python如何批量修改数据库执行Sql文件

    前言 由于上篇文章中批量修改了文件,有的时候数据库也需要批量修改一下,之前的做法是使用宝塔的phpMyAdmin导出一个已经修改好了的sql文件,然后依次去其他数据库里导入,效率不说极低,也算低了,且都是些重复性的劳动,所以打算用Python来批量执行sql 环境 版本:Python3.6 系统:MacOS IDE:PyCharm 第三方库:pymysql Show Code import pymysql host = 'xxx.65.9.191' username = 'root' passw

  • Python运行时修改业务SQL代码

    目录 前记 1.缘起 2.侵入库 3.获取商户ID 4.修改SQL 前记 在项目的演变过程中,有时可能会诞生一些需要奇怪的临时需求,这些需求会涉及到所有的SQL,但开发时间上却不允许整个项目的所有SQL进行重写,比如控制不同的人访问表的权限,或者是我面对的SASS化需求,这时就需要在运行时根据对应的条件来修改SQL语句. 1.缘起 最近项目在准备搞SASS化,SASS化有一个特点就是多租户,且每个租户之间的数据都要隔离,对于数据库的隔离方案常见的有数据库隔离,表隔离,字段隔离,目前我只用到表隔离

  • 在Python运行时动态查看进程内部信息的方法

    接前两篇"运行时查看线程信息"的博客,我在想,既然我可以随时打印线程信息,那么我是不是可以随时打印进程内部的其它信息呢?比如,实时查看一些对象属性等,这样可以帮助我们在不重新启动应用程序的情况下就可以观察进程的执行状态.(这里暂时不考虑那些使用第三方库或工具的情况) 根据这个想法,查看了一下python的动态加载模块的方法,感觉这个想法还是比较靠谱,应该可以实现,所以动手写了个小测试验证了一把.(这里说明一下,只是验证性的,生产环境要使用的话,还是有不少问题需要考虑的.) 下面就是测试

  • python运行时强制刷新缓冲区的方法

    需求:打印一颗"*"休息1s 代码如下: #!/usr/bin/python #coding=utf-8 ''' 暂停1s输出 ''' import time def printStar(n): for i in range(n): print " * ", time.sleep(1) if __name__ == '__main__': printStar(10) 输出结果(等待10s后一次性输出): [root@miner_k test]# python sle

  • java注解之运行时修改字段的注解值操作

    今天遇到需求:导入Excel时候列头会发生变化,客户是大爷要求你改代码, 导入Excel是用easypoi做的,识别表头是用注解@Excel(name = "xxx")通过这个name来匹配 那你表头要动,我这个注解是硬编码 所以就有动态设置这个表头 public class JavaVo{ @Excel(name = "xxx") private String userName; //省略getset方法 } ExcelImportUtil.importExcel

  • 在运行时编辑代码的 .NET 热重载的操作方法

    今天,我们很高兴向你介绍 Visual Studio 2019 中 16.11(预览版1)中的 .NET 热重载(通过 .NET 6(预览版4)中的 dotnet watch 命令行工具).在这篇文章的其余部分,我们会介绍什么是 .NET 热重载,您如何开始使用这个特性,我们对未来计划改进的设想,以及目前支持哪种编辑和语言的明确性. 今天,我们很高兴向你介绍 Visual Studio 2019 中 16.11(预览版1)中的 .NET 热重载(通过 .NET 6(预览版4)中的 dotnet

  • 使用memory_profiler监测python代码运行时内存消耗方法

    前几天一直在寻找能够输出python函数运行时最大内存消耗的方式,看了一堆的博客和知乎,也尝试了很多方法,最后选择使用memory_profiler中的mprof功能来进行测量的,它的原理是在代码运行过程中每0.1S统计一次内存,并生成统计图. 具体的使用方式如下: 首先安装memory_profiler和psutil(psutil主要用于提高memory_profile的性能,建议安装)(可使用pip直接安装) pip install memory_profiler pip install p

  • Android Studio使用Kotlin时,修改代码后运行不生效的解决方法

    问题现象 前段时间升级 Android Studio 3.1.3+ 版本后,决定尝试使用 Kotlin 做 APP 开发看看.结果却发现,修改 String 资源后,"运行",修改的内容没有生效.一开始以为只是 String 资源是这样,于是试了下 kt 文件,结果发现"运行"也不能生效. 但是先 clean 了,再"运行",却可以正常编译出来.查了好久发现是 New Module 后,Run/Debug Configurations不完整所致.

  • Python退出时强制运行一段代码的实现方法

    设想这样一个场景,你要给一个项目开发测试程序,程序开始运行的时候,会创建初始环境,测试完成以后,会清理环境. 这段逻辑本身非常简单: setup() test() clean() 但由于测试的代码比较复杂,你总是在调试的时候程序异常,导致每次clean()函数还没有来得及运行,程序就崩溃了. 你可能想到,如果这样写会怎么样呢: setup() try: text() except Exception as e: print('运行异常:', e) clean() 似乎看起来,程序一定会运行到cl

  • 关于python selenium 运行时弹出窗口问题

    近期在做一个网页代填项目时,用到了python的selenium,虽然实现了代填功能但是每次运行时都会弹出窗口,初始是python窗口,后续改进了又弹出了driver的窗口.在我看来是无伤大雅的,不过测试不接受,只能改,经过了各种尝试与搜索最后终算是较完美的解决了. 去除python窗口 项目初始是通过C++的process去调起python然后执行脚本的,后来发现会弹出python窗口. 使用的命令为 python.exe ie.py 效果如下 打开了页面但是同时会出现一个python窗口.

  • 命令行运行Python脚本时传入参数的三种方式详解

    如果在运行python脚本时需要传入一些参数,例如gpus与batch_size,可以使用如下三种方式. python script.py 0,1,2 10 python script.py -gpus=0,1,2 --batch-size=10 python script.py -gpus=0,1,2 --batch_size=10 这三种格式对应不同的参数解析方式,分别为sys.argv, argparse, tf.app.run, 前两者是python自带的功能,最后一个是tensorfl

随机推荐