Python实现实时增量数据加载工具的解决方案

目录
  • 创建增量ID记录表
  • 数据库连接类
  • 增量数据服务客户端
  • 结果测试

本次主要分享结合单例模式实际应用案例:实现实时增量数据加载工具的解决方案。最关键的是实现一个可进行添加、修改、删除等操作的增量ID记录表。

单例模式:提供全局访问点,确保类有且只有一个特定类型的对象。通常用于以下场景:日志记录或数据库操作等,避免对用一资源请求冲突。

创建增量ID记录表

import sqlite3
import datetime
import pymssql
import pandas as pd
import time
pd.set_option('expand_frame_repr', False)

导入所需模块

 # 创建数据表
database_path = r'.\Database\ID_Record.db'
from sqlite3 import connect

with connect(database_path) as conn:
    conn.execute(
        'CREATE TABLE IF NOT EXISTS Incremental_data_max_id_record(id INTEGER PRIMARY KEY AUTOINCREMENT,F_SDaqID_MAX TEXT,record_date datetime)')

增量最新记录ID-F_SDaqID_MAX数据库存储

#数据保存到本地txt
def text_save(filename, record):#filename为写入txt文件的路径,record为要写入F_SDaqID_MAX、record_date数据列表.
    file = open(filename,'a') 追加方式
    # file = open(filename, 'w')  #覆盖方式
    for i in range(len(record)):
        s = str(record[i]).replace('[','').replace(']','')
        s = s.replace("'",'').replace(',','') +'\n'   #去除单引号,逗号,每行末尾追加换行符
        file.write(s)
    file.close()

增量最新记录ID-F_SDaqID_MAX临时文件存储

增量ID记录提供了两种实现方案 ,一个是数据持久化存储模式,另一个是临时文件存储模式。数据持久化模式顾名思义,也就是说在创建对象的时候,能将操作关键信息如增量ID-F_SDaqID_MAX记录下来,这种flag记录映射是常选择的设计模式。

数据库连接类

实现实时增量数据获取需要实现两个数据库连接类:增量数据ID存储类和增量目标数据源类。这里利用单例模式实现数据库操作类,将增量服务记录信息按照顺序存储到数据库或特定的日志文件中,以维护数据的一致性。

1、增量数据ID存储sqlite连接类代码

class Database_sqlite(metaclass=MetaSingleton):
    database_path = r'.\Database\energy_rc_configure.db'
    connection = None
    def connect(self):
        if self.connection is None:
            self.connection = sqlite3.connect(self.database_path,check_same_thread=False,isolation_level=None)
            self.cursorobj =  self.connection.cursor()
        return self.cursorobj,self.connection

    # 插入最大记录
    @staticmethod
    def Insert_Max_ID_Record(f1, f2):

        cursor = Database_sqlite().connect()
        print(cursor)

        sql = f"""insert into Incremental_data_max_id_record(F_SDaqID_MAX,record_date) values("{f1}","{f2}")"""
        cursor[0].execute(sql)

        # sql = "insert  into Incremental_data_max_id_record(F_SDaqID_MAX,record_date) values(?,?)"
        # cursor[0].execute(sql,(f"{f1}",f"{f2}"))

        cursor[1].commit()
        print("插入成功!")
        # cursor[0].close()
        return 

    # 取出增量数据库中最新一次ID记录
    @staticmethod
    def View_Max_ID_Records():

        cursor = Database_sqlite().connect()
        sql = "select max(F_SDaqID_MAX) from Incremental_data_max_id_record"
        cursor[0].execute(sql)
        results = cursor[0].fetchone()[0]
        # #单例模式不用关闭数据库连接
        # cursor[0].close()
        print("最新记录ID", results)
        return results

    #删除数据记录ID
    @staticmethod
    def Del_Max_ID_Records():
        cursor = Database_sqlite().connect()
        sql = "delete from Incremental_data_max_id_record where record_date = (select MAX(record_date) from Incremental_data_max_id_record)"
        cursor[0].execute(sql)
        # results = cursor[0].fetchone()[0]
        # # cursor[0].close()
        cursor[1].commit()
        print("删除成功")
        return

2、增量数据源sqlserver连接类代码

class Database_sqlserver(metaclass=MetaSingleton):
    """
    #实时数据库
    """
    connection = None

    # def connect(self):
    def __init__(self):
        if self.connection is None:
            self.connection = pymssql.connect(host="xxxxx",user="xxxxx",password="xxxxx",database="xxxxx",charset="utf8")
            if self.connection:
                print("连接成功!")
            # 打开数据库连接
            self.cursorobj = self.connection.cursor()
        # return self.cursorobj, self.connection

    # 获取数据源中最大ID
    @staticmethod
    def get_F_SDaqID_MAX():
        # cursor_insert = Database_sqlserver().connect()
        cursor_insert = Database_sqlserver().cursorobj

        sql_MAXID = """select MAX(F_SDaqID) from T_DaqDataForEnergy"""

        cursor_insert.execute(sql_MAXID)  # 执行查询语句,选择表中所有数据

        F_SDaqID_MAX = cursor_insert.fetchone()[0]  # 获取记录

        print("最大ID值:{0}".format(F_SDaqID_MAX))

        return F_SDaqID_MAX

    # 提取增量数据
    @staticmethod
    def get_incremental_data(incremental_Max_ID):
        # 开始获取增量数据
        sql_incremental_data = """select F_ID,F_Datetime,F_Data from T_DaqDataForEnergy  where F_ID > {0}""".format(
            incremental_Max_ID)

        # cursor_find = Database_sqlserver().connect()
        cursor_find = Database_sqlserver().cursorobj

        cursor_find.execute(sql_incremental_data)  # 执行查询语句,选择表中所有数据

        Target_data_source = cursor_find.fetchall()  # 获取所有数据记录

        # cursor_find.close()
        cursor_find.close()

        df = pd.DataFrame(
            Target_data_source,
            columns=[
                "F_ID",
                "F_Datetime",
                "F_Data"])

        print("提取数据", df)
        return df

数据资源应用服务设计主要考虑数据库操作的一致性和优化数据库的各种操作,提高内存或CPU利用率。

实现多种读取和写入操作,客户端操作调用API,执行相应的DB操作。

注:

1、使用metaclass实现创建具有单例特征的类

Database_sqlserver(metaclass=MetaSingleton)

Database_sqlite(metaclass=MetaSingleton)

使用class定义新类时,数据库类Database_sqlserver由MetaSingleton装饰后即指定了metaclass,那么MetaSingleton的特殊方法__call__方法将自动执行。

class MetaSingleton(type):
    _instances={}
    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            cls._instances[cls] = super(MetaSingleton,cls).__call__(*args,**kwargs)
        return cls._instances[cls]

以上代码基于元类的单例实现,当客户端对数据库执行某些操作时,会多次实例化数据库类,但是只创建一个对象,所以对数据库的调用是同步的。

2、多线程使用同一数据库连接资源需采取一定同步机制

如果没采用同步机制,可能出现一些意料之外的情况

1)with cls.lock加锁

class MetaSingleton(type):
    _instances={}
    lock = threading.Lock()
    def __call__(cls, *args, **kwargs):
        with cls.lock:
            if cls not in cls._instances:
                time.sleep(0.05)  #模拟耗时
                cls._instances[cls] = super(MetaSingleton,cls).__call__(*args,**kwargs)
            return cls._instances[cls]

锁的创建和释放需要消耗资源,上面代码每次创建都必须获得锁。

3、如果我们开发的程序非单个应用,而是集群化的,即多个客户端共享单个数据库,导致数据库操作无法同步,而数据库连接池是更好的选择。大大节省了内存,提高了服务器地服务效率,能够支持更多的客户服务。

数据库连接池的解决方案是在应用程序启动时建立足够的数据库连接,并讲这些连接组成一个连接池,由应用程序动态地对池中的连接进行申请、使用和释放。对于多于连接池中连接数的并发请求,应该在请求队列中排队等待。

增量数据服务客户端

增量处理策略:第一次加载先判断增量数据表中是否存在最新记录,若有直接加载;否则,记录一下最大/最新的数据记录ID或时间点,保存到一个增量数据库或记录文件中。

从第二次加载开始只加载最大/最新的ID或时间点以后的数据。当加载过程全部成功完成之后并同步更新增量数据库或记录文件,更新这次数据记录的最后记录ID或时间点。

一般这类数据记录表有自增长列,那么也可以使用自增长列来实现这个标识特征。比如本次我用到数据表增长列F_ID。

class IncrementalRecordServer:
    _servers = []
    _instance = None
    def __new__(cls, *args, **kwargs):
        if not IncrementalRecordServer._instance:
            # IncrementalRecordServer._instance = super().__new__(cls)
            IncrementalRecordServer._instance = super(IncrementalRecordServer,cls).__new__(cls)
        return IncrementalRecordServer._instance

    def __init__(self,changeServersID=None):

        """
        变量初始化过程
        """
        self.F_SDaqID_MAX = Database_sqlserver().get_F_SDaqID_MAX()
        self.record_date = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        self.changeServersID = changeServersID

    # 回调更新本地记录,清空记录替换,临时记录
    def record(func):
        def Server_record(self):
            v = func(self)
            text_save(filename=r"F:\AutoOps_platform\Database\Server_record.txt",record=IncrementalRecordServer._servers)
            print("保存成功")

            return v
        return Server_record

    #增加服务记录
    @record
    def addServer(self):
        self._servers.append([int(self.F_SDaqID_MAX),self.record_date])
        print("添加记录")
        Database_sqlite.Insert_Max_ID_Record(f1=self.F_SDaqID_MAX, f2=self.record_date)

    #修改服务记录
    @record
    def changeServers(self):
        # self._servers.pop()
        # 此处传入手动修改的记录ID
        self._servers.append([self.changeServersID,self.record_date])
        #先删除再插入实现修改
        Database_sqlite.Del_Max_ID_Records()
        Database_sqlite.Insert_Max_ID_Record(f1=self.changeServersID, f2=self.record_date)
        print("更新记录")

    #删除服务记录
    @record
    def popServers(self):
        # self._servers.pop()
        print("删除记录")
        Database_sqlite.Del_Max_ID_Records()

    # 最新服务记录
    def getServers(self):
        # print(self._servers[-1])
        Max_ID_Records = Database_sqlite.View_Max_ID_Records()
        print("查看记录",Max_ID_Records)
        return Max_ID_Records

    #提取数据
    def Incremental_data_client(self):
        """
        # 提取数据(增量数据MAXID获取,并提取增量数据)
        """
        # 实时数据库
        # 第一次加载先判断是否存在最新记录
        if self.getServers() == None:
            # 插入增量数据库ID
            self.addServer()
            # 提取增量数据
            data = Database_sqlserver.get_incremental_data(self.F_SDaqID_MAX)
            return data

        # 获取增量数据库中已有的最新最大ID记录
        incremental_Max_ID = self.getServers()

        #添加记录
        self.addServer()
        # 提取增量数据
        Target_data_source = Database_sqlserver.get_incremental_data(incremental_Max_ID)

        return Target_data_source

优化策略:

1、延迟加载方式

以上增量记录服务类IncrementalRecordServer通过覆盖__new__方法来控制对象的创建,我们在创建对象的时候会先检查对象是否存在。也可以通过懒加载的方式实现,节约资源优化如下。

class IncrementalRecordServer:
    _servers = []
    _instance = None

    def __init__(self,changeServersID=None):
        """
        变量初始化过程
        """
        self.F_SDaqID_MAX = Database_sqlserver().get_F_SDaqID_MAX()
        self.record_date = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        self.changeServersID = changeServersID

        if not IncrementalRecordServer._instance:
            print("__init__对象创建")
        else:
            print("对象已经存在:",IncrementalRecordServer._instance)
            self.getInstance()

    @classmethod
    def getInstance(cls):
        if not cls._instance:
            cls._instance = IncrementalRecordServer()
        return cls._instance

懒汉式实例化能够确保实际需要时才创建对象,实例化a= IncrementalRecordServer()时,调用初始化__init__方法,但是没有新的对象创建。懒汉式这种方式加载类对象,也称为延迟加载方式。

2、单例模式能有效利用空间资源,每次利用同一空间资源。

不同操作对象的内存地址相同,且不同对象初始化将上一个对象初始化变量覆盖,确保最新记录实时更新。表面上以上代码实现了单例模式没问题,但多线程并发情况下,存在线程安全问题,可能同时创建不同的对象空间。考虑到线程安全,也可以进一步加锁处理.

3、适用范围及注意事项

本次代码适用于部署生产指定时间点运行之后产出的增量数据,长时间未启用再启动需要清空历史记录即增量数据库或文件ID需清空,一般实时数据增量实现一次加载没有什么问题,所以这一点也不用很关注(文件方式代码可自行完善);当加载历史数据库或定时间隔产生数据量过大时,需要进一步修改代码,需要判断数据规模,指定起始节点及加载数据量,综合因素考虑,下次分享一下亿级数据量提取方案。

4、进一步了解Python垃圾回收机制;并发情况下,通过优化线程池来管理资源。

最后可以添加一个函数来释放资源

def __del__(self):
    class_name = self.__class__.__name__
    print(class_name,"销毁")

del obj 调用__del__() 销毁对象,释放其空间;只有Python 对象在不再引用对象时被释放。当程序中有其它变量引用该实例对象时,即便手动调用 __del__() 方法,该方法也不会立即执行。这和 Python 的垃圾回收机制的实现有关。

结果测试

if __name__ == '__main__':
    for i in range(6):
        hc1 = IncrementalRecordServer()
        hc1.addServer()
        print("Record_ID",hc1._servers[i])
        # del hc1
        time.sleep(60)

    #Server2-客户端client
    # 最新服务记录
    hc2 = IncrementalRecordServer()
    hc2.getServers()
    #查看增量数据
    hc2.Incremental_data_client()

插入记录

模拟每1分钟插入一条记录,向增量数据库插入7条

if __name__ == '__main__':
    # Server3-客户端client
    # 手动添加增量起始ID记录
    hc3 = IncrementalRecordServer(changeServersID='346449980')
    hc3.changeServers()

if __name__ == '__main__':
    #删除ID
    hc3 = IncrementalRecordServer(changeServersID='346449980')
    # hc3.changeServers()
    hc3.popServers()

以上就是Python实现实时增量数据加载工具的解决方案的详细内容,更多关于Python增量数据加载的资料请关注我们其它相关文章!

(0)

相关推荐

  • Python加载数据的5种不同方式(收藏)

    数据是数据科学家的基础,因此了解许多加载数据进行分析的方法至关重要.在这里,我们将介绍五种Python数据输入技术,并提供代码示例供您参考. 作为初学者,您可能只知道一种使用p andas.read_csv函数读取数据的方式(通常以CSV格式).它是最成熟,功能最强大的功能之一,但其他方法很有帮助,有时肯定会派上用场. 我要讨论的方法是: Manual 函数 loadtxt 函数 genfromtxtf 函数 read_csv 函数 Pickle 我们将用于加载数据的数据集可以在此处找到 .它被

  • Python增量循环删除MySQL表数据的方法

    需求场景: 有一业务数据库,使用MySQL 5.5版本,每天会写入大量数据,需要不定期将多表中"指定时期前"的数据进行删除,在SQL SERVER中很容易实现,写几个WHILE循环就搞定,虽然MySQL中也存在类似功能,怎奈自己不精通,于是采用Python来实现 话不多少,上脚本: # coding: utf-8 import MySQLdb import time # delete config DELETE_DATETIME = '2016-08-31 23:59:59' DELE

  • Python Pytorch深度学习之数据加载和处理

    目录 一.下载安装包 二.下载数据集 三.读取数据集 四.编写一个函数看看图像和landmark 五.数据集类 六.数据可视化 七.数据变换 1.Function_Rescale 2.Function_RandomCrop 3.Function_ToTensor 八.组合转换 九.迭代数据集 总结 一.下载安装包 packages: scikit-image:用于图像测IO和变换 pandas:方便进行csv解析 二.下载数据集 数据集说明:该数据集(我在这)是imagenet数据集标注为fac

  • python实现MySQL指定表增量同步数据到clickhouse的脚本

    python实现MySQL指定表增量同步数据到clickhouse,脚本如下: #!/usr/bin/env python3 # _*_ coding:utf8 _*_ from pymysqlreplication import BinLogStreamReader from pymysqlreplication.row_event import (DeleteRowsEvent,UpdateRowsEvent,WriteRowsEvent,) import clickhouse_driver

  • 利用python进行数据加载

    前言 最近参加了datawhale的组队学习活动,在组队学习动员下,开始通过强迫自己输出来实现更好的输入与处理,6-15开始自己的第一次文章发布,我会把自己这个真的很小白遇到的问题写出来,希望能给屏幕前小白的你带来帮助. 工作中大量繁琐的自动化,把以前在学校摸过的python重新捡起来,不成体系的.拼图一样把需要的工作搭建起来,工作暂时是可用上了,每天节省了至少3个小时的数据处理工作,手里拿着python这个锤子,看什么都像钉子. 首先,你要先学会安装软件,anaconda软件,安装成功后,你点

  • Python实现实时增量数据加载工具的解决方案

    目录 创建增量ID记录表 数据库连接类 增量数据服务客户端 结果测试 本次主要分享结合单例模式实际应用案例:实现实时增量数据加载工具的解决方案.最关键的是实现一个可进行添加.修改.删除等操作的增量ID记录表. 单例模式:提供全局访问点,确保类有且只有一个特定类型的对象.通常用于以下场景:日志记录或数据库操作等,避免对用一资源请求冲突. 创建增量ID记录表 import sqlite3 import datetime import pymssql import pandas as pd impor

  • Oracle 高速批量数据加载工具sql*loader使用说明

    SQL*Loader(SQLLDR)是Oracle的高速批量数据加载工具.这是一个非常有用的工具,可用于多种平面文件格式向Oralce数据库中加载数据.SQLLDR可以在极短的时间内加载数量庞大的数据.它有两种操作模式. 传统路径:(conventional path):SQLLDR会利用SQL插入为我们加载数据. 直接路径(direct path):采用这种模式,SQLLDR不使用SQL:而是直接格式化数据库块. 利用直接路径加载,你能从一个平面文件读数据,并将其直接写至格式化的数据库块,而绕

  • python机器学习pytorch自定义数据加载器

    目录 正文 1. 加载数据集 2. 迭代和可视化数据集 3.创建自定义数据集 3.1 __init__ 3.2 __len__ 3.3 __getitem__ 4. 使用 DataLoaders 为训练准备数据 5.遍历 DataLoader 正文 处理数据样本的代码可能会逐渐变得混乱且难以维护:理想情况下,我们希望我们的数据集代码与我们的模型训练代码分离,以获得更好的可读性和模块化.PyTorch 提供了两个数据原语:torch.utils.data.DataLoader和torch.util

  • python用pandas数据加载、存储与文件格式的实例

    数据加载.存储与文件格式 pandas提供了一些用于将表格型数据读取为DataFrame对象的函数.其中read_csv和read_talbe用得最多 pandas中的解析函数: 函数 说明 read_csv 从文件.URL.文件型对象中加载带分隔符的数据,默认分隔符为逗号 read_table 从文件.URL.文件型对象中加载带分隔符的数据.默认分隔符为制表符("\t") read_fwf 读取定宽列格式数据(也就是说,没有分隔符) read_clipboard 读取剪贴板中的数据,

  • Python实现从文件中加载数据的方法详解

    前几篇都是手动录入或随机函数产生的数据.实际有许多类型的文件,以及许多方法,用它们从文件中提取数据来图形化. 比如之前python基础(12)介绍打开文件的方式,可直接读取文件中的数据,扩大了我们的数据来源.下面,将展示几种方法. 我们将使用内置的 csv 模块加载CSV文件 CSV文件是一种特殊的文本文件,文件中的数据以逗号作为分隔符,很适合进行数据的解析.先用excle建立如下表格和数据,另存为csv格式文件,放到代码目录下. 包含在Python标准库中自带CSV 模块,我们只需要impor

  • Oracle数据加载和卸载的实现方法

    在日常工作中:经常会遇到这样的需求: Oracle 数据表跟文本或者文件格式进行交互:即将指定文件内容导入对应的 Oracle 数据表中:或者从 Oracle 数据表导出. 其他数据库中的表跟Oracle数据库进行交互. 若是少量数据:可选择的解决方案有很多.常用的用 Pl/SQL developer工具,或者手动转换为 INSERT 语句,或者通过API.但数据量大:用上面的方法效率太烂了.本文来说说 Oracle 数据的加载和卸载. Oracle中的DBLINK Oracle加载数据-外部表

  • 使用Python串口实时显示数据并绘图的例子

    使用pyserial进行串口传输 一.安装pyserial以及基本用法 在cmd下输入命令pip install pyserial 注:升级pip后会出现 "'E:\Anaconda3\Scripts\pip-script.py' is not present."错误 使用 easy_install pip命令就能解决,换一条重新能执行安装的命令 常用方法: ser = serial.Serial(0) 是打开第一个串口 print ser.portstr 能看到第一个串口的标识,wi

  • Tensorflow 多线程与多进程数据加载实例

    在项目中遇到需要处理超级大量的数据集,无法载入内存的问题就不用说了,单线程分批读取和处理(虽然这个处理也只是特别简单的首尾相连的操作)也会使瓶颈出现在CPU性能上,所以研究了一下多线程和多进程的数据读取和预处理,都是通过调用dataset api实现 1. 多线程数据读取 第一种方法是可以直接从csv里读取数据,但返回值是tensor,需要在sess里run一下才能返回真实值,无法实现真正的并行处理,但如果直接用csv文件或其他什么文件存了特征值,可以直接读取后进行训练,可使用这种方法. imp

  • 解决JSON数据因为null导致数据加载失败的方法

    一.首先分析问题: 使用NSJSONSerialization或者AFN框架的AFHTTPSessionManager(底层也是NSJSONSerialization)将NSData数据转化成OC对象,有时会出现URL正确,加载数据任然会报错: reason: '-[NSNull length]: unrecognized selector sent to instance 分析原因发现,转化出来的OC对象中含有null.所以,NSNull没有length方法,所以会报找不到方法错误. 二.解决

随机推荐