Python探针完成调用库的数据提取

目录
  • 1.简单粗暴的方法--对mysql库进行封装
  • 2.Python的探针
  • 3.制作探针模块
  • 4.直接替换方法
  • 5.总结

1.简单粗暴的方法--对mysql库进行封装

要统计一个执行过程, 就需要知道这个执行过程的开始位置和结束位置, 所以最简单粗暴的方法就是基于要调用的方法进行封装,在框架调用MySQL库和MySQL库中间实现一个中间层, 在中间层完成耗时统计,如:

# 伪代码
def my_execute(conn, sql, param):
 # 针对MySql库的统计封装组件
 with MyTracer(conn, sql, param):
     # 以下为正常使用MySql库的代码
with conn.cursor as cursor:
 cursor.execute(sql, param)
...

看样子实现起来非常不错, 而且更改非常方便, 但由于是在最顶层的API上进行修改, 其实是非常不灵活的, 同时在cursor.execute里会进行一些预操作, 如把sql和param进行拼接, 调用nextset清除当前游标的数据等等。我们最后拿到的数据如时间耗时也是不准确的, 同时也没办法得到一些详细的元数据, 如错误码等等.

如果要拿到最直接有用的数据,就只能去改源代码, 然后再调用源代码了, 但是如果每个库都需要改源代码才能统计, 那也太麻烦了, 好在Python也提供了一些类似探针的接口, 可以通过探针把库的源码进行替换完成我们的代码.

2.Python的探针

在Python中可以通过sys.meta_path来实现import hook的功能, 当执行 import 相关操作时, 会根据sys.meta_path定义的对象对import相关库进行更改.sys.meta_path中的对象需要实现一个find_module方法, 这个find_module方法返回None或一个实现了load_module方法的对象, 我们可以通过这个对象, 针对一些库在import时, 把相关的方法进行替换, 简单用法如下,通过hooktime.sleep让他在sleep的时候能打印消耗的时间.

import importlib
import sys
from functools import wraps
def func_wrapper(func):
    """这里通过一个装饰器来达到狸猫换太子和获取数据的效果"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        # 记录开始时间
        start = time.time()
        result = func(*args, **kwargs)
        # 统计消耗时间
        end = time.time()
        print(f"speed time:{end - start}")
        return result
    return wrapper
class MetaPathFinder:
    def find_module(self, fullname, path=None):
        # 执行时可以看出来在import哪些模块
        print(f'find module:{path}:{fullname}')
        return MetaPathLoader()
class MetaPathLoader:
    def load_module(self, fullname):
        # import的模块都会存放在sys.modules里面, 通过判断可以减少重复import
        if fullname in sys.modules:
            return sys.modules[fullname]
        # 防止递归调用
        finder = sys.meta_path.pop(0)
        # 导入 module
        module = importlib.import_module(fullname)
        if fullname == 'time':
            # 替换函数
            module.sleep = func_wrapper(module.sleep)
        sys.meta_path.insert(0, finder)
        return module
sys.meta_path.insert(0, MetaPathFinder())
if __name__ == '__main__':
    import time
    time.sleep(1)
# 输出示例:
# find module:datetime
# find module:time
# load module:time
# find module:math
# find module:_datetime
# speed time:1.00073385238647468

3.制作探针模块

了解完了主要流程后, 可以开始制作自己的探针模块了, 由于示例只涉及到aiomysql模块, 那么在MetaPathFinder.find_module中需要只对aiomysql模块进行处理, 其他的先忽略. 然后我们需要确定我们要把aiomysql的哪个功能给替换, 从业务上来说, 一般情况下我们只要cursor.execute, cursor.fetchone, cursor.fetchall, cursor.executemany这几个主要的操作,所以需要深入cursor看看如何去更改代码, 后者重载哪个函数.

先cursor.execute的源码(cursor.executemanay也类似), 发现会先调用self.nextset的方法, 把上个请求的数据先拿完, 再合并sql语句, 最后通过self._query进行查询:

async def execute(self, query, args=None):
    """Executes the given operation
    Executes the given operation substituting any markers with
    the given parameters.
    For example, getting all rows where id is 5:
        cursor.execute("SELECT * FROM t1 WHERE id = %s", (5,))
    :param query: ``str`` sql statement
    :param args: ``tuple`` or ``list`` of arguments for sql query
    :returns: ``int``, number of rows that has been produced of affected
    """
    conn = self._get_db()

    while (await self.nextset()):
        pass

    if args is not None:
        query = query % self._escape_args(args, conn)

    await self._query(query)
    self._executed = query
    if self._echo:
        logger.info(query)
        logger.info("%r", args)
    return self._rowcount

再看cursor.fetchone的源码(cursor.fetchall也类似), 发现其实是从缓存中获取数据,

这些数据在执行cursor.execute中就已经获取了:

def fetchone(self):
    """Fetch the next row """
    self._check_executed()
    fut = self._loop.create_future()
    if self._rows is None or self._rownumber >= len(self._rows):
        fut.set_result(None)
        return fut
    result = self._rows[self._rownumber]
    self._rownumber += 1
    fut = self._loop.create_future()
    fut.set_result(result)
    return fut

综合上面的分析, 我们只要对核心的方法self._query进行重载即可拿到我们要的数据, 从源码中我们可以知道, 我们能获取到传入self._query的self和sql参数, 根据self又能获取到查询的结果, 同时我们通过装饰器能获取到运行的时间, 要的数据基本都到齐了,

按照思路修改后的代码如下:

import importlib
import time
import sys
from functools import wraps
from typing import cast, Any, Callable, Optional, Tuple, TYPE_CHECKING
from types import ModuleType
if TYPE_CHECKING:
    import aiomysql
def func_wrapper(func: Callable):
    @wraps(func)
    async def wrapper(*args, **kwargs) -> Any:
        start: float = time.time()
        func_result: Any = await func(*args, **kwargs)
        end: float = time.time()
        # 根据_query可以知道, 第一格参数是self, 第二个参数是sql
        self: aiomysql.Cursor = args[0]
        sql: str = args[1]
        # 通过self,我们可以拿到其他的数据
        db: str = self._connection.db
        user: str = self._connection.user
        host: str = self._connection.host
        port: str = self._connection.port
        execute_result: Tuple[Tuple] = self._rows
        # 可以根据自己定义的agent把数据发送到指定的平台, 然后我们就可以在平台上看到对应的数据或进行监控了,
        # 这里只是打印一部分数据出来
        print({
            "sql": sql,
            "db": db,
            "user": user,
            "host": host,
            "port": port,
            "result": execute_result,
            "speed time": end - start
        })
        return func_result
    return cast(Callable, wrapper)
class MetaPathFinder:

    @staticmethod
    def find_module(fullname: str, path: Optional[str] = None) -> Optional["MetaPathLoader"]:
        if fullname == 'aiomysql':
            # 只有aiomysql才进行hook
            return MetaPathLoader()
        else:
            return None
class MetaPathLoader:
    @staticmethod
    def load_module(fullname: str):
        if fullname in sys.modules:
            return sys.modules[fullname]
        # 防止递归调用
        finder: "MetaPathFinder" = sys.meta_path.pop(0)
        # 导入 module
        module: ModuleType = importlib.import_module(fullname)
        # 针对_query进行hook
        module.Cursor._query = func_wrapper(module.Cursor._query)
        sys.meta_path.insert(0, finder)
        return module
async def test_mysql() -> None:
    import aiomysql
    pool: aiomysql.Pool = await aiomysql.create_pool(
        host='127.0.0.1', port=3306, user='root', password='123123', db='mysql'
    )
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute("SELECT 42;")
            (r,) = await cur.fetchone()
            assert r == 42
    pool.close()
    await pool.wait_closed()

if __name__ == '__main__':
    sys.meta_path.insert(0, MetaPathFinder())
    import asyncio
    asyncio.run(test_mysql())
# 输出示例:
# 可以看出sql语句与我们输入的一样, db, user, host, port等参数也是, 还能知道执行的结果和运行时间
# {'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.00045609474182128906}

这个例子看来很不错, 但是需要在调用的入口处显式调用该逻辑, 通常一个项目可能有几个入口, 每个入口都显示调用该逻辑会非常麻烦, 而且必须先调用我们的hook逻辑后才能import, 这样就得订好引入规范, 不然就可能出现部分地方hook不成功, 如果能把引入hook这个逻辑安排在解析器启动后马上执行, 就可以完美地解决这个问题了. 查阅了一翻资料后发现,python解释器初始化的时候会自动import PYTHONPATH下存在的sitecustomize和usercustomize模块, 我们只要创建该模块, 并在模块里面写入我们的 替换函数即可。

.
├── __init__.py
├── hook_aiomysql.py
├── sitecustomize.py
└── test_auto_hook.py

hook_aiomysql.py是我们制作探针的代码为例子, 而sitecustomize.py存放的代码如下, 非常简单, 就是引入我们的探针代码, 并插入到sys.meta_path:

import sys
from hook_aiomysql import MetaPathFinder
sys.meta_path.insert(0, MetaPathFinder())

test_auto_hook.py则是测试代码:

import asyncio
from hook_aiomysql import test_mysql
asyncio.run(test_mysql())

接下来只要设置PYTHONPATH并运行我们的代码即可(如果是项目的话一般交由superisor启动,则可以在配置文件中设置好PYTHONPATH):

(.venv) ➜  python_hook git:(master) ✗ export PYTHONPATH=.
(.venv) ➜  python_hook git:(master) ✗ python test_auto_hook.py
{'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.000213623046875}

4.直接替换方法

可以看到上面的方法很好的运行了, 而且可以很方便的嵌入到我们的项目中, 但是依赖与sitecustomize.py文件很难让他抽离成一个第三方的库, 如果要抽离成第三方的库就得考虑看看有没有其他的方法。在上面介绍MetaPathLoader时说到了sys.module, 在里面通过sys.modules来减少重复引入:

class MetaPathLoader:
    def load_module(self, fullname):
        # import的模块都会存放在sys.modules里面, 通过判断可以减少重复import
        if fullname in sys.modules:
            return sys.modules[fullname]
        # 防止递归调用
        finder = sys.meta_path.pop(0)
        # 导入 module
        module = importlib.import_module(fullname)
        if fullname == 'time':
            # 替换函数
            module.sleep = func_wrapper(module.sleep)
        sys.meta_path.insert(0, finder)
        return module

这个减少重复引入的原理是, 每次引入一个模块后, 他就会存放在sys.modules, 如果是重复引入, 就会直接刷新成最新引入的模块。上面之所以会考虑到减少重复import是因为我们不会在程序运行时升级第三方库的依赖。利用到我们可以不考虑重复引入同名不同实现的模块, 以及sys.modules会缓存引入模块的特点, 我们可以把上面的逻辑简化成引入模块->替换当前模块方法为我们修改的hook方法。

import time
from functools import wraps
from typing import Any, Callable, Tuple, cast
import aiomysql
def func_wrapper(func: Callable):
    """和上面一样的封装函数, 这里简单略过"""
# 判断是否hook过
_IS_HOOK: bool = False
# 存放原来的_query
_query: Callable = aiomysql.Cursor._query
# hook函数
def install_hook() -> None:
    _IS_HOOK = False
    if _IS_HOOK:
        return
    aiomysql.Cursor._query = func_wrapper(aiomysql.Cursor._query)
    _IS_HOOK = True
# 还原到原来的函数方法
def reset_hook() -> None:
    aiomysql.Cursor._query = _query
    _IS_HOOK = False

代码简单明了,接下来跑一跑刚才的测试:

import asyncio
import aiomysql
from demo import install_hook, reset_hook
async def test_mysql() -> None:
    pool: aiomysql.Pool = await aiomysql.create_pool(
        host='127.0.0.1', port=3306, user='root', password='', db='mysql'
    )
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute("SELECT 42;")
            (r,) = await cur.fetchone()
            assert r == 42
    pool.close()
    await pool.wait_closed()

print("install hook")
install_hook()
asyncio.run(test_mysql())
print("reset hook")
reset_hook()
asyncio.run(test_mysql())
print("end")

通过测试输出可以发现我们的逻辑的正确的, install hook后能出现我们提取的元信息, 而reset后则不会打印原信息

install hook
{'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.000347137451171875}
reset hook
end

5.总结

得益于Python动态语言的特性, 我们可以很容易的为第三方库实现钩子方法,上面说的两种方法中, 第二种方法非常简单, 但在自己项目中最好还是采用第一种方法, 因为Python是通过一行一行代码进行扫描执行的, 第二种方法只能放在入口代码中, 并且要在被hook的对象实例化之前执行, 不然就会实现hook失败的现象, 而第一种方法除了麻烦外,基本上能躲避所有坑。

到此这篇关于Python探针完成调用库的数据提取的文章就介绍到这了,更多相关 Python探针 内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Python数据提取-lxml模块

    知识点: 了解lxml模块和xpath语法的关系: 了解lxml模块的使用场景: 了解lxml模块的安装: 了解 谷歌浏览器xpath helper插件的安装和使用: 掌握xpath语法-基础节点选择语法: 掌握 xpath语法 -节点修饰语法: 掌握xpath语法 - 其他常用语法: 掌握 lmxl模块中使用xpath语法定位元素提取数学值或文本内容: 掌握lxml模块etree.tostring函数的使用: 1.了解lxml模块和xpath语法 对html或xml形式的文本提取特定的内容,就

  • 利用python对Excel中的特定数据提取并写入新表的方法

    最近刚开始学python,正好实习工作中遇到对excel中的数据进行处理的问题,就想到利用python来解决,也恰好练手. 实际的问题是要从excel表中提取日期.邮件地址和时间,然后统计在一定时间段内某个人在某个项目上用了多少时间,最后做成一张数据透视表(这是问题的大致意思). 首先要做的就是数据提取了,excel中本身有一个text to column的功能,但是对列中规律性不好的数据处理效果很差,不能分割出想要的数据,所以我果断选择用python来完成. 要用的库一个是对excel读写处理

  • python 将json数据提取转化为txt的方法

    如下所示: #-*- coding: UTF-8 -*- import json import pymysql import os import sys # 数据类型 # { # "name": "score.networkQuality", # "index": true, # "view": "app/views/score/networkQuality.tmpl.html", # "file

  • Python进行数据提取的方法总结

    准备工作 首先是准备工作,导入需要使用的库,读取并创建数据表取名为loandata. import numpy as np import pandas as pd loandata=pd.DataFrame(pd.read_excel('loan_data.xlsx')) 设置索引字段 在开始提取数据前,先将member_id列设置为索引字段.然后开始提取数据. Loandata = loandata.set_index('member_id') 按行提取信息 第一步是按行提取数据,例如提取某个

  • python 数据提取及拆分的实现代码

    K线数据提取 依据原有数据集格式,按要求生成新表: 1.每分钟的close数据的第一条.最后一条.最大值及最小值, 2.每分钟vol数据的增长量(每分钟vol的最后一条数据减第一条数据) 3.汇总这些信息生成一个新表 (字段名:['time','open','close','high','low','vol']) import pandas as pd import time start=time.time() df=pd.read_csv('data.csv') df=df.drop('id'

  • Python 探针的实现原理

    探针的实现主要涉及以下几个知识点: sys.meta_path sitecustomize.py sys.meta_path sys.meta_path 这个简单的来说就是可以实现 import hook 的功能, 当执行 import 相关的操作时,会触发 sys.meta_path 列表中定义的对象. 关于 sys.meta_path 更详细的资料请查阅 python 文档中 sys.meta_path 相关内容以及 PEP 0302 . sys.meta_path 中的对象需要实现一个 f

  • Python探针完成调用库的数据提取

    目录 1.简单粗暴的方法--对mysql库进行封装 2.Python的探针 3.制作探针模块 4.直接替换方法 5.总结 1.简单粗暴的方法--对mysql库进行封装 要统计一个执行过程, 就需要知道这个执行过程的开始位置和结束位置, 所以最简单粗暴的方法就是基于要调用的方法进行封装,在框架调用MySQL库和MySQL库中间实现一个中间层, 在中间层完成耗时统计,如: # 伪代码 def my_execute(conn, sql, param): # 针对MySql库的统计封装组件 with M

  • 推荐技术人员一款Python开源库(造数据神器)

    1. 背景 在软件需求.开发.测试过程中,有时候需要使用一些测试数据,针对这种情况,我们一般要么使用已有的系统数据,要么需要手动制造一些数据.由于现在的业务系统数据多种多样,千变万化.在手动制造数据的过程中,可能需要花费大量精力和工作量,此项工作既繁复又容易出错,比如要构造一批用户三要素(姓名.手机号.身份证).构造一批银行卡数据.或构造一批地址通讯录等. 这时候,人们常常为了偷懒快捷,测试数据大多数可能是类似这样子的: 测试, 1300000 000123456 张三, 1310000 000

  • Python库 Bokeh 数据可视化实用指南

    目录 什么是 Bokeh 在哪使用 Bokeh 图 安装Bokeh库 导入Bokeh库 绘制图表的语法 使用Bokeh库主题 图表样式 Python 中的 Bokeh用例 数据 数据说明 饼形图 圆环图 散点图 简单直方图 堆积直方图 不同类型的条形图 简单条形图 堆积条形图 堆积垂直条形图 双向条形图 折线图 棒棒糖图表 面积图 Bokeh库的布局功能 技术交流 什么是 Bokeh Bokeh 是 Python 中的交互式可视化库.Bokeh提供的最佳功能是针对现代 Web 浏览器进行演示的高

  • Python调用Prometheus监控数据并计算

    目录 Prometheus是什么 Prometheus基础概念 什么是时间序列数据 什么是targets(目标) 什么是metrics(指标) 什么是PromQL(函数式查询语言) 如何监控远程Linux主机 Prometheus HTTP API 支持的 API 认证方法 数据返回格式 数据写入 监控数据查询 什么是Grafana 工作使用场景 CPU峰值计算 CPU均值计算 内存峰值计算 内存均值计算 导出excel 参考链接: Prometheus是什么 Prometheus是一套开源监控

  • python每5分钟从kafka中提取数据的例子

    我就废话不多说了,直接上代码吧! import sys sys.path.append("..") from datetime import datetime from utils.kafka2file import KafkaDownloader import os """ 实现取kafka数据,文件按照取数据的间隔命名 如每5分钟从kafka取数据写入文件中,文件名为当前时间加5 """ TOPIC = "rtz

  • 在Python IDLE 下调用anaconda中的库教程

    大家都知道,Anaconda是一个开源的Python发行版本,其包含了conda.Python等180多个科学包及其依赖项.下载了anaconda我们可以很方便的随时调用这里面的库. 原先我自己在Python官网下载了python 3.7开发环境,anaconda的后面下载的,平时比较喜欢使用 IDLE 作简单的程序或学习的时候,发现调用不了anaconda中的库,就算是在cmd程序中使用pip 下载相应的库时,最终的库路径也是存于anaconda的库路径中. 当然,通过相关命令实现pip下载路

随机推荐