Python 获取 datax 执行结果保存到数据库的方法

执行 datax 作业,创建执行文件,在 crontab 中每天1点(下面有关系)执行:

其中 job_start 及 job_finish 这两行记录是自己添加的,为了方便识别出哪张表。

#!/bin/bash
source /etc/profile
user1="root"
pass1="pwd"
user2="root"
pass2="pwd"
job_path="/opt/datax/job/"

jobfile=(
job_table_a.json
job_table_b.json
)

for filename in ${jobfile[@]}
do
	echo "job_start: "`date "+%Y-%m-%d %H:%M:%S"`" ${filename}"
	python /opt/datax/bin/datax.py -p "-Duser1=${user1} -Dpass1=${pass1} -Duser2=${user2} -Dpass2=${pass2}" ${job_path}${filename}
	echo "job_finish: "`date "+%Y-%m-%d %H:%M:%S"`" ${filename}"
done

# 0 1 * * * /opt/datax/job/dc_to_ods_incr.sh >> /opt/datax/job/log/dc_to_ods_incr_$(date +\%Y\%m\%d_\%H\%M\%S).log 2>&1
# egrep '任务|速度|总数|job_start|job_finish' /opt/datax/job/log/

datax 执行日志:

job_start: 2018-08-08 01:13:28 job_table_a.json
任务启动时刻          : 2018-08-08 01:13:28
任务结束时刻          : 2018-08-08 01:14:49
任务总计耗时          :         81s
任务平均流量          :     192.82KB/s
记录写入速度          :      1998rec/s
读出记录总数          :       159916
读写失败总数          :          0
job_finish: 2018-08-08 01:14:49 job_table_a.json
job_start: 2018-08-08 01:14:49 job_table_b.json
任务启动时刻          : 2018-08-08 01:14:50
任务结束时刻          : 2018-08-08 01:15:01
任务总计耗时          :         11s
任务平均流量          :        0B/s
记录写入速度          :       0rec/s
读出记录总数          :          0
读写失败总数          :          0
job_finish: 2018-08-08 01:15:01 job_table_b.json

接下来读取这些信息保存到数据库,在数据库中创建表:

CREATE TABLE `datax_job_result` (
 `log_file` varchar(200) DEFAULT NULL,
 `job_file` varchar(200) DEFAULT NULL,
 `start_time` datetime DEFAULT NULL,
 `end_time` datetime DEFAULT NULL,
 `seconds` int(11) DEFAULT NULL,
 `traffic` varchar(50) DEFAULT NULL,
 `write_speed` varchar(50) DEFAULT NULL,
 `read_record` int(11) DEFAULT NULL,
 `failed_record` int(11) DEFAULT NULL,
 `job_start` varchar(200) DEFAULT NULL,
 `job_finish` varchar(200) DEFAULT NULL,
 `insert_time` datetime DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

定时执行以下文件,因为 datax 作业 1 点执行,为了获取一天内最新生产的日志,脚本中取 82800内生产的日志文件,及23 小时内生产的那个最新日志。所以一天内任何时间执行都可以。此文件也是定时每天执行(判断 datax 作业完成后执行)

#!/usr/bin/python
# -*- coding: UTF-8 -*-
# 0 5 * * * source /etc/profile && /usr/bin/python2.7 /opt/datax/job/save_log_to_db.py > /dev/null 2>&1

import re
import os
import sqlalchemy
import pandas as pd
import datetime as dt

def save_to_db(df):
	engine = sqlalchemy.create_engine("mysql+pymysql://root:pwd@localhost:3306/test", encoding="utf-8")
	df.to_sql("datax_job_result", engine, index=False, if_exists='append') 

def get_the_latest_file(path):
	t0 = dt.datetime.utcfromtimestamp(0)
	d2 = (dt.datetime.now() - t0).total_seconds()
	d1 = d2 - 82800
	for (dirpath, dirnames, filenames) in os.walk(path):
		for filename in sorted(filenames, reverse = True):
			if filename.endswith(".log"):
				f = os.path.join(dirpath,filename)
				ctime = os.stat(f)[-1]
				if ctime>=d1 and ctime <=d2:
					return f

def get_job_result_from_logfile(path):
	result = pd.DataFrame(columns=['log_file','job_file','start_time','end_time','seconds','traffic','write_speed','read_record','failed_record','job_start','job_finish'])
	log_file = get_the_latest_file(path)
	index = 0
	content = open(log_file, "r")
	for line in content:
		result.loc[index, 'log_file'] = log_file
		if re.compile(r'job_start').match(line):
			result.loc[index, 'job_file'] = line.split(' ')[4].strip()
			result.loc[index, 'job_start'] = line,
		elif re.compile(r'任务启动时刻').match(line):
			result.loc[index, 'start_time'] = line.split('刻')[1].strip().split(' ')[1].strip() + ' ' + line.split('刻')[1].strip().split(' ')[2].strip()
		elif re.compile(r'任务结束时刻').match(line):
			result.loc[index, 'end_time'] = line.split('刻')[1].strip().split(' ')[1].strip() + ' ' + line.split('刻')[1].strip().split(' ')[2].strip()
		elif re.compile(r'任务总计耗时').match(line):
			result.loc[index, 'seconds'] = line.split(':')[1].strip().replace('s','')
		elif re.compile(r'任务平均流量').match(line):
			result.loc[index, 'traffic'] = line.split(':')[1].strip()
		elif re.compile(r'记录写入速度').match(line):
			result.loc[index, 'write_speed'] = line.split(':')[1].strip()
		elif re.compile(r'读出记录总数').match(line):
			result.loc[index, 'read_record'] = line.split(':')[1].strip()
		elif re.compile(r'读写失败总数').match(line):
			result.loc[index, 'failed_record'] = line.split(':')[1].strip()
		elif re.compile(r'job_finish').match(line):
			result.loc[index, 'job_finish'] = line,
			index = index + 1
		else:
			pass
	save_to_db(result)

get_job_result_from_logfile("/opt/datax/job/log")

以上这篇Python 获取 datax 执行结果保存到数据库的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • 详解Python的数据库操作(pymysql)

    使用原生SQL语句进行对数据库操作,可完成数据库表的建立和删除,及数据表内容的增删改查操作等.其可操作性很强,如可以直接使用"show databases"."show tables"等语句进行表格之外的部分操作. Centos7远程操作数据库时需要关闭防火墙,否则会连接不上 安装: pip3 install pymysql 数据查询: import pymysql #建立数据库连接 conn=pymysql.connect(host="192.168.1

  • Python 获取 datax 执行结果保存到数据库的方法

    执行 datax 作业,创建执行文件,在 crontab 中每天1点(下面有关系)执行: 其中 job_start 及 job_finish 这两行记录是自己添加的,为了方便识别出哪张表. #!/bin/bash source /etc/profile user1="root" pass1="pwd" user2="root" pass2="pwd" job_path="/opt/datax/job/" j

  • python获取程序执行文件路径的方法(推荐)

    1.获取当前执行主脚本方法:sys.argv[0]和_ file _ (1)sys.argv 一个传给Python脚本的指令参数列表.sys.argv[0]是脚本的名字.一般得到的是相对路径,用os.path.abspath(sys.argv[0])得到执行文件的绝对路径: dirname, filename = os.path.split(os.path.abspath(sys.argv[0])) os.path.realpath(sys.argv[0]) 如果在命令行执行sys.argv返回

  • vue开发chrome插件,实现获取界面数据和保存到数据库功能

    前言 最近在评估项目时,要开启评估平台,查看平台和保存平台,感觉非常繁琐,开发了一款可以获取评估平台数据,查看项目排期和直接保存数据到数据库的chrome插件,由于我需要使用之前vue封装的一个日历插件,这里就用vue来开发这个插件. 开发前准备 要开发一个chrome插件,我们首先需要了解chrome插件的基本结构和对应的功能. 每个扩展的文件类型和目录数量有所不同,但都必须有 manifest. 一些基本但有用的扩展程序可能仅由 manifest 及其工具栏图标组成. manifest.js

  • Python轻量级ORM框架Peewee访问sqlite数据库的方法详解

    本文实例讲述了Python轻量级ORM框架Peewee访问sqlite数据库的方法.分享给大家供大家参考,具体如下: ORM框架就是 object relation model,对象关系模型,用来实现把数据库中的表 映射到 面向对象编程语言中的类,不需要写sql,通过操作对象就能实现 增删改查. ORM的基本技术有3种: (1)映射技术 数据类型映射:就是把数据库中的数据类型,映射到编程语言中的数据类型.比如,把数据库的int类型映射到Python中的integer 类型. 类映射:把数据库中的

  • thinkPHP3.0框架实现模板保存到数据库的方法

    本文实例讲述了thinkPHP3.0框架实现模板保存到数据库的方法.分享给大家供大家参考,具体如下: 在开发cms的时候用到如果将模板文件存入到数据库并显示到页面中 由于thinkphp3.0都是直接从模板文件中读取再解析的那么对于模板存入数据库中就只有自己开发了,还有thinkphp3.0中有mode的功能我们可以定义自己的mode这样就可以达到目的了,那么如何来扩展自己的mode呢?如下: 1.在你的入口文件中输入 define('MODE_NAME','Ey'); 其中"Ey"就

  • PHP获取MySQL执行sql语句的查询时间方法

    如下所示: //计时开始 runtime(); //执行查询 mysql_query($sql); //计时结束. echo runtime(1); //计时函数 function runtime($mode=0) { static $t; if(!$mode) { $t = microtime(); return; } $t1 = microtime(); list($m0,$s0) = explode(" ",$t); list($m1,$s1) = explode("

  • 分享Python获取本机IP地址的几种方法

    目录 1.使用专用网站 2.使用自带socket库 3.使用第三方netifaces库 1.使用专用网站 获取的是公网IP 网址:http://myip.ipip.net 代码: import requests res = requests.get('http://myip.ipip.net', timeout=5).text print(res) 比较喜欢用这个,在命令窗口也能使用: curl http://myip.ipip.net 2.使用自带socket库 获取的是局域网IP. impo

  • Python获取某一天是星期几的方法示例

    本文实例讲述了Python获取某一天是星期几的方法.分享给大家供大家参考,具体如下: 这里以2017年的春节(1月28号)为例: import re; import time; import datetime; if(__name__=="__main__"): #today=int(time.strftime("%w")); anyday=datetime.datetime(2017,1,28).strftime("%w"); print an

  • python获取指定目录下所有文件名列表的方法

    本文实例讲述了python获取指定目录下所有文件名列表的方法.分享给大家供大家参考.具体实现方法如下: 这里python代码实现获取文件名列表的功能,可以指定文件中包含的字符,方便提取特定类型的文件名列表: # -*- coding: utf-8 -*- #~ #------------------------------------------------------------------ #~ module:wlab #~ Filename:wgetfilelist.py #~ Funct

  • python获取外网IP并发邮件的实现方法

    第一步:通过ip138来爬取外网ip 第二步:通过python的smtplib模块和email来发送邮件,具体用法去网上搜索, 下面是代码示例: #!/usr/bin/env python #coding:utf-8 import urllib2 import re import smtplib from email.MIMEText import MIMEText from email.Header import Header #################################

随机推荐