python ETL工具 pyetl

pyetl是一个纯python开发的ETL框架, 相比sqoop, datax 之类的ETL工具,pyetl可以对每个字段添加udf函数,使得数据转换过程更加灵活,相比专业ETL工具pyetl更轻量,纯python代码操作,更加符合开发人员习惯

安装

pip3 install pyetl

使用示例

数据库表之间数据同步

from pyetl import Task, DatabaseReader, DatabaseWriter
reader = DatabaseReader("sqlite:///db1.sqlite3", table_name="source")
writer = DatabaseWriter("sqlite:///db2.sqlite3", table_name="target")
Task(reader, writer).start()

数据库表到hive表同步

from pyetl import Task, DatabaseReader, HiveWriter2
reader = DatabaseReader("sqlite:///db1.sqlite3", table_name="source")
writer = HiveWriter2("hive://localhost:10000/default", table_name="target")
Task(reader, writer).start()

数据库表同步es

from pyetl import Task, DatabaseReader, ElasticSearchWriter
reader = DatabaseReader("sqlite:///db1.sqlite3", table_name="source")
writer = ElasticSearchWriter(hosts=["localhost"], index_name="tartget")
Task(reader, writer).start()

原始表目标表字段名称不同,需要添加字段映射

添加

# 原始表source包含uuid,full_name字段
reader = DatabaseReader("sqlite:///db.sqlite3", table_name="source")
# 目标表target包含id,name字段
writer = DatabaseWriter("sqlite:///db.sqlite3", table_name="target")
# columns配置目标表和原始表的字段映射关系
columns = {"id": "uuid", "name": "full_name"}
Task(reader, writer, columns=columns).start()

字段的udf映射,对字段进行规则校验、数据标准化、数据清洗等

# functions配置字段的udf映射,如下id转字符串,name去除前后空格
functions={"id": str, "name": lambda x: x.strip()}
Task(reader, writer, columns=columns, functions=functions).start()

继承Task类灵活扩展ETL任务

import json
from pyetl import Task, DatabaseReader, DatabaseWriter

class NewTask(Task):
  reader = DatabaseReader("sqlite:///db.sqlite3", table_name="source")
  writer = DatabaseWriter("sqlite:///db.sqlite3", table_name="target")

  def get_columns(self):
    """通过函数的方式生成字段映射配置,使用更灵活"""
    # 以下示例将数据库中的字段映射配置取出后转字典类型返回
    sql = "select columns from task where name='new_task'"
    columns = self.writer.db.read_one(sql)["columns"]
    return json.loads(columns)

  def get_functions(self):
    """通过函数的方式生成字段的udf映射"""
    # 以下示例将每个字段类型都转换为字符串
    return {col: str for col in self.columns}

  def apply_function(self, record):
    """数据流中对一整条数据的udf"""
    record["flag"] = int(record["id"]) % 2
    return record

  def before(self):
    """任务开始前要执行的操作, 如初始化任务表,创建目标表等"""
    sql = "create table destination_table(id int, name varchar(100))"
    self.writer.db.execute(sql)

  def after(self):
    """任务完成后要执行的操作,如更新任务状态等"""
    sql = "update task set status='done' where name='new_task'"
    self.writer.db.execute(sql)

NewTask().start()

目前已实现Reader和Writer列表

Reader 介绍
DatabaseReader 支持所有关系型数据库的读取
FileReader 结构化文本数据读取,如csv文件
ExcelReader Excel表文件读取
Writer 介绍
DatabaseWriter 支持所有关系型数据库的写入
ElasticSearchWriter 批量写入数据到es索引
HiveWriter 批量插入hive表
HiveWriter2 Load data方式导入hive表(推荐)
FileWriter 写入数据到文本文件

项目地址pyetl

总结

到此这篇关于python ETL工具 pyetl的文章就介绍到这了,更多相关python ETL工具 pyetl内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 使用python telnetlib批量备份交换机配置的方法

    使用了telnetlib模块,首先登录到交换机,列出并获取配置文件的名称,然后通过tftp协议将配置文件传输到文件服务器上,为避免配置文件覆盖,将备份的配置文件名称统一加入日期以作区分. 1. 登录方式和口令有好几种,比较懒惰,通过不同列表以做区分,如果每个交换机口令都不相同的话,就需要额外处理了. 2. 交换机的配置文件也有多种类型,也是通过列表进行区分. 3. 有些交换机支持ftp和sftp,但测试发现有些虽然有相应的客户端命令,但传输总有问题.也不能将每个交换机都配置为ftp服务器,不安全

  • Python linecache.getline()读取文件中特定一行的脚本

    比如: Code highlighting produced by Actipro CodeHighlighter (freeware) http://www.CodeHighlighter.com/ -->import linecacheprint linecache.getline('2.1_open.py', 4)将返回我上一节事例代码文件2.1_open.py的第4行文字,输出结果:f = open('/home/evergreen/桌面/test') 查看linecache中的实现(我

  • 在python中logger setlevel没有生效的解决

    在logging中,Logger's level 的默认等级为warning 所以虽然在handler中setlervel了,Logger's level 和Handler's Level 但是level取较高的那个(待校验) 所以日志的level 为warning 解决此问题可以采用 logging.root.setLevel(logging.NOTSET) 完整源码如下图: import logging class loggerr(object): def __init__(self,log

  • python ETL工具 pyetl

    pyetl是一个纯python开发的ETL框架, 相比sqoop, datax 之类的ETL工具,pyetl可以对每个字段添加udf函数,使得数据转换过程更加灵活,相比专业ETL工具pyetl更轻量,纯python代码操作,更加符合开发人员习惯 安装 pip3 install pyetl 使用示例 数据库表之间数据同步 from pyetl import Task, DatabaseReader, DatabaseWriter reader = DatabaseReader("sqlite://

  • 用于ETL的Python数据转换工具详解

    ETL的考虑   做 数据仓库系统,ETL是关键的一环.说大了,ETL是数据整合解决方案,说小了,就是倒数据的工具.回忆一下工作这么些年来,处理数据迁移.转换的工作倒 还真的不少.但是那些工作基本上是一次性工作或者很小数据量,使用access.DTS或是自己编个小程序搞定.可是在数据仓库系统中,ETL上升到了一 定的理论高度,和原来小打小闹的工具使用不同了.究竟什么不同,从名字上就可以看到,人家已经将倒数据的过程分成3个步骤,E.T.L分别代表抽取.转换 和装载. 其 实ETL过程就是数据流动的

  • Java实现的执行python脚本工具类示例【使用jython.jar】

    本文实例讲述了Java实现的执行python脚本工具类.分享给大家供大家参考,具体如下: 这里java中执行python脚本工具类,需要使用jython.jar java中执行python脚本工具类,学习的时候写着玩: import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.List; i

  • Python切片工具pillow用法示例

    本文实例讲述了Python切片工具pillow用法.分享给大家供大家参考,具体如下: 切片:使用切片将源图像分成许多的功能区域 因为要对图片进行切片裁剪,所以用到切片工具必不可少,在ubuntu下有很多的图片处理工具,如 GIMP(Ubuntu的下的Photoshop),shotwell,shotter等等. 但是我想吧一张图片剪裁下来,用那些工具不怎么方便(其实可能是我没有找到而已),于是上网搜索资料,发现各式各类的工具,其中发现了pollow这款工具. 算是Python下的一个模块吧,这个模

  • 浅析python打包工具distutils、setuptools

    python中安装包的方式有很多种: 源码包:python setup.py install 在线安装:pip install 包名(linux) / easy_install 包名(window) python包在开发中十分常见,一般的使用套路是所有的功能做一个python模块包,打包模块,然后发布,安装使用.打包和安装包就是最常见的工作.学习中遇到distutils和setuptools两种打包的工具,学习之后做笔记记录. distutils distutils 是 python 标准库的一

  • Python小工具之消耗系统指定大小内存的方法

    工作中需要根据某个应用程序具体吃了多少内存来决定执行某些操作,所以需要写个小工具来模拟应用程序使用内存情况,下面是我写的一个Python脚本的实现. #!/usr/bin/python # -*- coding: utf-8 -*- import sys import re import time def print_help(): print 'Usage: ' print ' python mem.py 100MB' print ' python mem.py 1GB' if __name_

  • 很酷的python表白工具 你喜欢我吗

    本文实例为大家分享了python表白工具的具体代码,供大家参考,具体内容如下 实现代码: # 打包操作 # 安装pyinstaller # cmd输入 pip install pyinstaller # shift+右击文件夹 点击在此处打开命令窗口 # pyinstaller -F -w love.py //打包程序 # 引用tkinter工具包 from tkinter import * #__all__=[a,b] #from tkinter import messagebox # 定义关

  • python图形工具turtle绘制国际象棋棋盘

    本文实例为大家分享了python图形工具turtle绘制国际象棋棋盘的具体代码,供大家参考,具体内容如下 #编写程序绘制一个国际象棋的棋盘 import turtle turtle.speed(30) turtle.penup() off = True for y in range(-40, 30 + 1, 10): for x in range(-40, 30 + 1, 10): if off: turtle.goto(x, y) turtle.pendown() turtle.begin_f

  • Python定时任务工具之APScheduler使用方式

    APScheduler (advanceded python scheduler)是一款Python开发的定时任务工具. 文档地址 apscheduler.readthedocs.io/en/latest/u- 特点: 不依赖于Linux系统的crontab系统定时,独立运行 可以 动态添加 新的定时任务,如下单后30分钟内必须支付,否则取消订单,就可以借助此工具(每下一单就要添加此订单的定时任务) 对添加的定时任务可以做持久保存 1 安装 pip install apscheduler 2 组

  • python自动化工具之pywinauto实例详解

    本文实例为大家分享了python自动化工具pywinauto,供大家参考,具体内容如下 一.win环境应用自动化 1.浏览器中下载 2.在cmd下启动:python get-pip.py 3.在cmd中输入python -m pip --version查看安装pip的版本. 4.模块安装:pip3 install pywinauto 5.程序中加载模块: import time from pywinautoimport application 二,实例 #encoding=utf-8 #auth

随机推荐