Python读取Hive数据库实现代码详解

目录
  • 实际业务读取hive数据库的代码
  • 代码说明和领悟
  • 后续附上修改成mysql的一个例子代码

背景:

在这篇文章之前,我读取数据库的数据没有形成规范,并且代码扩展性不好,使用率不高,而且比较混乱。数据库信息的替换也比较混乱。坏习惯包括:连接数据库之后就开始读数,读完就结束,数据的存放也没有规范,而且容易重复读取。

现在将代码分为几层,一层是底层,就是单独连接数据库,在这基础上封装第二个类别,加上了线程锁和时间表,用于确保读数的稳定和超时错误提醒。第三层才是真正的业务,第三层的类里面封装了很多读取不同数据表的方法,每一个方法就是读一个表,然后将数据缓存起来,并且设置好更新数据缓存的时间(例如24小时),和维护多线程读数。

第四层也就是简单的调用第三层即可,然后所有的数据都可以读取然后缓存到我们在配置项中指定的文件夹目录了

实际业务读取hive数据库的代码

import logging
import pandas as pd
from impala.dbapi import connect
import sqlalchemy
from sqlalchemy.orm import sessionmaker
import os
import time
import os
import datetime
from dateutil.relativedelta import relativedelta
from typing import Dict, List
import logging
import threading
import pandas as pd
import pickle
class HiveHelper(object):
    def __init__(
        self,
        host='10.2.32.22',
        port=21051,
        database='ur_ai_dw',
        auth_mechanism='LDAP',
        user='urbi',
        password='Ur#730xd',
        logger:logging.Logger=None
        ):
        self.host = host
        self.port = port
        self.database = database
        self.auth_mechanism = auth_mechanism
        self.user = user
        self.password = password
        self.logger = logger
        self.impala_conn = None
        self.conn = None
        self.cursor = None
        self.engine = None
        self.session = None
    def create_table_code(self, file_name):
        '''创建表类代码'''
        os.system(f'sqlacodegen {self.connection_str} > {file_name}')
        return self.conn
    def get_conn(self):
        '''创建连接或获取连接'''
        if self.conn is None:
            engine = self.get_engine()
            self.conn = engine.connect()
        return self.conn
    def get_impala_conn(self):
        '''创建连接或获取连接'''
        if self.impala_conn is None:
            self.impala_conn = connect(
                host=self.host,
                port=self.port,
                database=self.database,
                auth_mechanism=self.auth_mechanism,
                user=self.user,
                password=self.password
                )
        return self.impala_conn
    def get_engine(self):
        '''创建连接或获取连接'''
        if self.engine is None:
            self.engine = sqlalchemy.create_engine('impala://', creator=self.get_impala_conn)
        return self.engine
    def get_cursor(self):
        '''创建连接或获取连接'''
        if self.cursor is None:
            self.cursor = self.conn.cursor()
        return self.cursor
    def get_session(self) -> sessionmaker:
        '''创建连接或获取连接'''
        if self.session is None:
            engine = self.get_engine()
            Session = sessionmaker(bind=engine)
            self.session = Session()
        return self.session
    def close_conn(self):
        '''关闭连接'''
        if self.conn is not None:
            self.conn.close()
            self.conn = None
        self.dispose_engine()
        self.close_impala_conn()
    def close_impala_conn(self):
        '''关闭impala连接'''
        if self.impala_conn is not None:
            self.impala_conn.close()
            self.impala_conn = None
    def close_session(self):
        '''关闭连接'''
        if self.session is not None:
            self.session.close()
            self.session = None
        self.dispose_engine()
    def dispose_engine(self):
        '''释放engine'''
        if self.engine is not None:
            # self.engine.dispose(close=False)
            self.engine.dispose()
            self.engine = None
    def close_cursor(self):
        '''关闭cursor'''
        if self.cursor is not None:
            self.cursor.close()
            self.cursor = None
    def get_data(self, sql, auto_close=True) -> pd.DataFrame:
        '''查询数据'''
        conn = self.get_conn()
        data = None
        try:
            # 异常重试3次
            for i in range(3):
                try:
                    data = pd.read_sql(sql, conn)
                    break
                except Exception as ex:
                    if i == 2:
                        raise ex # 往外抛出异常
                    time.sleep(60) # 一分钟后重试
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            if auto_close:
                self.close_conn()
        return data
pass
class VarsHelper():
    def __init__(self, save_dir, auto_save=True):
        self.save_dir = save_dir
        self.auto_save = auto_save
        self.values = {}
        if not os.path.exists(os.path.dirname(self.save_dir)):
            os.makedirs(os.path.dirname(self.save_dir))
        if os.path.exists(self.save_dir):
            with open(self.save_dir, 'rb') as f:
                self.values = pickle.load(f)
                f.close()
    def set_value(self, key, value):
        self.values[key] = value
        if self.auto_save:
            self.save_file()
    def get_value(self, key):
        return self.values[key]
    def has_key(self, key):
        return key in self.values.keys()
    def save_file(self):
        with open(self.save_dir, 'wb') as f:
            pickle.dump(self.values, f)
            f.close()
pass
class GlobalShareArgs():
    args = {
        "debug": False
    }
    def get_args():
        return GlobalShareArgs.args
    def set_args(args):
        GlobalShareArgs.args = args
    def set_args_value(key, value):
        GlobalShareArgs.args[key] = value
    def get_args_value(key, default_value=None):
        return GlobalShareArgs.args.get(key, default_value)
    def contain_key(key):
        return key in GlobalShareArgs.args.keys()
    def update(args):
        GlobalShareArgs.args.update(args)
pass
class ShareArgs():
    args = {
        "labels_dir":"./hjx/shop_group/month_w_amt/data/labels", # 标签目录
        "labels_output_dir":"./hjx/shop_group/month_w_amt/data/labels_output", # 聚类导出标签目录
        "common_datas_dir":"./hjx/data", # 共用数据目录。ur_bi_dw的公共
        "only_predict": False, # 只识别,不训练
        "delete_model": True, # 先删除模型,仅在训练时使用
        "export_excel": False, # 导出excel
        "classes": 12, # 聚类数
        "batch_size": 16,
        "hidden_size": 32,
        "max_nrof_epochs": 100,
        "learning_rate": 0.0005,
        "loss_type": "categorical_crossentropy",
        "avg_model_num": 10,
        "steps_per_epoch": 4.0, # 4.0
        "lr_callback_patience": 4,
        "lr_callback_cooldown": 1,
        "early_stopping_callback_patience": 6,
        "get_data": True,
    }
    def get_args():
        return ShareArgs.args
    def set_args(args):
        ShareArgs.args = args
    def set_args_value(key, value):
        ShareArgs.args[key] = value
    def get_args_value(key, default_value=None):
        return ShareArgs.args.get(key, default_value)
    def contain_key(key):
        return key in ShareArgs.args.keys()
    def update(args):
        ShareArgs.args.update(args)
pass
class UrBiGetDatasBase():
    # 线程锁列表,同保存路径共用锁
    lock_dict:Dict[str, threading.Lock] = {}
    # 时间列表,用于判断是否超时
    time_dict:Dict[str, datetime.datetime] = {}
    # 用于记录是否需要更新超时时间
    get_data_timeout_dict:Dict[str, bool] = {}
    def __init__(
        self,
        host='10.2.32.22',
        port=21051,
        database='ur_ai_dw',
        auth_mechanism='LDAP',
        user='urbi',
        password='Ur#730xd',
        save_dir=None,
        logger:logging.Logger=None,
        ):
        self.save_dir = save_dir
        self.logger = logger
        self.db_helper = HiveHelper(
            host=host,
            port=port,
            database=database,
            auth_mechanism=auth_mechanism,
            user=user,
            password=password,
            logger=logger
            )
        # 创建子目录
        if self.save_dir is not None and not os.path.exists(self.save_dir):
            os.makedirs(self.save_dir)
        self.vars_helper = None
        if GlobalShareArgs.get_args_value('debug'):
            self.vars_helper = VarsHelper('./hjx/data/vars/UrBiGetDatas')
    def close(self):
        '''关闭连接'''
        self.db_helper.close_conn()
    def get_last_time(self, key_name) -> bool:
        '''获取是否超时'''
        # 转静态路径,确保唯一性
        key_name = os.path.abspath(key_name)
        if self.vars_helper is not None and self.vars_helper.has_key('UrBiGetDatasBase.time_list'):
            UrBiGetDatasBase.time_dict = self.vars_helper.get_value('UrBiGetDatasBase.time_list')
        timeout = 12 # 12小时
        if GlobalShareArgs.get_args_value('debug'):
            timeout = 24 # 24小时
        get_data_timeout = False
        if key_name not in UrBiGetDatasBase.time_dict.keys() or (datetime.datetime.today() - UrBiGetDatasBase.time_dict[key_name]).total_seconds()>(timeout*60*60):
            self.logger.info('超时%d小时,重新查数据:%s', timeout, key_name)
            # UrBiGetDatasBase.time_list[key_name] = datetime.datetime.today()
            get_data_timeout = True
        else:
            self.logger.info('未超时%d小时,跳过查数据:%s', timeout, key_name)
        # if self.vars_helper is not None :
        #     self.vars_helper.set_value('UrBiGetDatasBase.time_list', UrBiGetDatasBase.time_list)
        UrBiGetDatasBase.get_data_timeout_dict[key_name] = get_data_timeout
        return get_data_timeout
    def save_last_time(self, key_name):
        '''更新状态超时'''
        # 转静态路径,确保唯一性
        key_name = os.path.abspath(key_name)
        if UrBiGetDatasBase.get_data_timeout_dict[key_name]:
            UrBiGetDatasBase.time_dict[key_name] = datetime.datetime.today()
        if self.vars_helper is not None :
            UrBiGetDatasBase.time_dict[key_name] = datetime.datetime.today()
            self.vars_helper.set_value('UrBiGetDatasBase.time_list', UrBiGetDatasBase.time_dict)
    def get_lock(self, key_name) -> threading.Lock:
        '''获取锁'''
        # 转静态路径,确保唯一性
        key_name = os.path.abspath(key_name)
        if key_name not in UrBiGetDatasBase.lock_dict.keys():
            UrBiGetDatasBase.lock_dict[key_name] = threading.Lock()
        return UrBiGetDatasBase.lock_dict[key_name]
    def get_data_of_date(
        self,
        save_dir,
        sql,
        sort_columns:List[str],
        del_index_list=[-1], # 删除最后下标
        start_date = datetime.datetime(2017, 1, 1), # 开始时间
        offset = relativedelta(months=3), # 时间间隔
        date_format_fun = lambda d: '%04d%02d01' % (d.year, d.month), # 查询语句中替代时间参数的格式化
        filename_format_fun = lambda d: '%04d%02d.csv' % (d.year, d.month), # 查询语句中替代时间参数的格式化
        stop_date = '20700101', # 超过时间则停止
        data_format_fun = None, # 格式化数据
        ):
        '''分时间增量读取数据'''
        # 创建文件夹
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)
        else:
            #删除最后一个文件
            file_list = os.listdir(save_dir)
            if len(file_list)>0:
                file_list.sort()
                for del_index in del_index_list:
                    os.remove(os.path.join(save_dir,file_list[del_index]))
                    print('删除最后一个文件:', file_list[del_index])
        select_index = -1
        # start_date = datetime.datetime(2017, 1, 1)
        while True:
            end_date = start_date + offset
            start_date_str = date_format_fun(start_date)
            end_date_str = date_format_fun(end_date)
            self.logger.info('date: %s-%s', start_date_str, end_date_str)
            file_path = os.path.join(save_dir, filename_format_fun(start_date))
            # self.logger.info('file_path: %s', file_path)
            if not os.path.exists(file_path):
                data:pd.DataFrame = self.db_helper.get_data(sql % (start_date_str, end_date_str))
                if data is None:
                    break
                self.logger.info('data: %d', len(data))
                # self.logger.info('data: %d', data.columns)
                if len(data)>0:
                    select_index+=1
                    if data_format_fun is not None:
                        data = data_format_fun(data)
                    # 排序
                    data = data.sort_values(sort_columns)
                    data.to_csv(file_path)
                elif select_index!=-1:
                    break
                elif stop_date < start_date_str:
                    raise Exception("读取数据异常,时间超出最大值!")
            start_date = end_date
pass
class UrBiGetDatas(UrBiGetDatasBase):
    def __init__(
        self,
        host='10.2.32.22',
        port=21051,
        database='ur_ai_dw',
        auth_mechanism='LDAP',
        user='urbi',
        password='Ur#730xd',
        save_dir='./hjx/data/ur_bi_dw_data',
        logger:logging.Logger=None
        ):
        self.save_dir = save_dir
        self.logger = logger
        super().__init__(
            host=host,
            port=port,
            database=database,
            auth_mechanism=auth_mechanism,
            user=user,
            password=password,
            save_dir=save_dir,
            logger=logger
            )
    def get_dim_date(self):
        '''日期数据'''
        file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_date.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = 'SELECT * FROM ur_bi_dw.dim_date'
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:'dim_date.'+c for c in columns}
            data = data.rename(columns=columns)
            data = data.sort_values(['dim_date.date_key'])
            data.to_csv(file_path)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dim_shop(self):
        '''店铺数据'''
        file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_shop.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = 'SELECT * FROM ur_bi_dw.dim_shop'
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:'dim_shop.'+c for c in columns}
            data = data.rename(columns=columns)
            data = data.sort_values(['dim_shop.shop_no'])
            data.to_csv(file_path)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dim_vip(self):
        '''会员数据'''
        sub_dir = os.path.join(self.save_dir,'vip_no')
        now_lock = self.get_lock(sub_dir)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(sub_dir):
                return
            sql = '''SELECT dv.*, dd.date_key, dd.date_name2
            FROM ur_bi_dw.dim_vip as dv
            INNER JOIN ur_bi_dw.dim_date as dd
            ON dv.card_create_date=dd.date_name2
            where dd.date_key >= %s
            and dd.date_key < %s'''
            # data:pd.DataFrame = self.db_helper.get_data(sql)
            sort_columns = ['dv.vip_no']
            # TODO:
            self.get_data_of_date(
                save_dir=sub_dir,
                sql=sql,
                sort_columns=sort_columns,
                start_date=datetime.datetime(2017, 1, 1), # 开始时间
                offset=relativedelta(years=1)
            )
            # 更新超时时间
            self.save_last_time(sub_dir)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_weather(self):
        '''天气数据'''
        sub_dir = os.path.join(self.save_dir,'weather')
        now_lock = self.get_lock(sub_dir)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(sub_dir):
                return
            sql = """
            select weather.* from ur_bi_ods.ods_base_weather_data_1200 as weather
            where weather.date_key>=%s and weather.date_key<%s
            """
            sort_columns = ['weather.date_key','weather.areaid']
            def data_format_fun(data):
                columns = list(data.columns)
                columns = {c:'weather.'+c for c in columns}
                data = data.rename(columns=columns)
                return data
            self.get_data_of_date(
                save_dir=sub_dir,
                sql=sql,
                sort_columns=sort_columns,
                del_index_list=[-2, -1], # 删除最后下标
                data_format_fun=data_format_fun,
            )
            # 更新超时时间
            self.save_last_time(sub_dir)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_weather_city(self):
        '''天气城市数据'''
        file_path = os.path.join(self.save_dir,'ur_bi_dw.weather_city.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = 'SELECT * FROM ur_bi_dw.dim_weather_city as weather_city'
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:'weather_city.'+c for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dim_goods(self):
        '''货品数据'''
        file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_goods.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = 'SELECT * FROM ur_bi_dw.dim_goods'
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:'dim_goods.'+c for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dim_goods_market_shop_date(self):
        '''店铺商品生命周期数据'''
        file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_goods_market_shop_date.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            # sql = 'SELECT * FROM ur_bi_dw.dim_goods_market_shop_date as goods_shop_date'
            sql = '''
            select shop_no, sku_no, shop_market_date, lifecycle_end_date, lifecycle_days
            FROM ur_bi_dw.dim_goods_market_shop_date
            where lifecycle_end_date is not null
            '''
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:c.replace('lifecycle_end_date.','') for c in columns}
            data = data.rename(columns=columns)
            data = data.sort_values(['shop_market_date'])
            data.to_csv(file_path, index=False)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dim_goods_market_date(self):
        '''全国商品生命周期数据'''
        file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_goods_market_date.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = '''
            select * FROM ur_bi_dw.dim_goods_market_date
            '''
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:'dim_goods_market_date.'+c for c in columns}
            data = data.rename(columns=columns)
            data = data.sort_values(['dim_goods_market_date.sku_no'])
            data.to_csv(file_path, index=False)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dim_goods_color_dev_sizes(self):
        '''商品开发码数数据'''
        file_path = os.path.join(self.save_dir,'dim_goods_color_dev_sizes.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            # sql = 'SELECT * FROM ur_bi_dw.dim_goods_market_shop_date as goods_shop_date'
            sql = 'SELECT * FROM ur_bi_dm.dim_goods_color_dev_sizes'
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:c.replace('dim_goods_color_dev_sizes.','') for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path, index=False)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dwd_daily_sales_size(self):
        '''实际销售金额'''
        sub_dir = os.path.join(self.save_dir,'dwd_daily_sales_size_all')
        now_lock = self.get_lock(sub_dir)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(sub_dir):
                return
            sql = """
            select shop_no,sku_no,date_key,`size`,
                sum(tag_price) as `tag_price`,
                sum(sales_qty) as `sales_qty`,
                sum(sales_tag_amt) as `sales_tag_amt`,
                sum(sales_amt) as `sales_amt`,
                count(0) as `sales_count`
            from ur_bi_dw.dwd_daily_sales_size as sales
            where sales.date_key>=%s and sales.date_key<%s
                and sales.currency_code='CNY'
            group by shop_no,sku_no,date_key,`size`
            """
            sort_columns = ['date_key','shop_no','sku_no']
            self.get_data_of_date(
                save_dir=sub_dir,
                sql=sql,
                sort_columns=sort_columns,
                start_date=datetime.datetime(2017, 1, 1), # 开始时间
            )
            # 更新超时时间
            self.save_last_time(sub_dir)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dwd_daily_delivery_size(self):
        '''实际配货金额'''
        sub_dir = os.path.join(self.save_dir,'dwd_daily_delivery_size_all')
        now_lock = self.get_lock(sub_dir)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(sub_dir):
                return
            sql = """
            select shop_no,sku_no,date_key,`size`,
                sum(delivery.shop_distr_received_qty) as `shop_distr_received_qty`,
                sum(delivery.shop_distr_received_amt) as `shop_distr_received_amt`,
                sum(delivery.online_distr_received_qty) as `online_distr_received_qty`,
                sum(delivery.online_distr_received_amt) as `online_distr_received_amt`,
                sum(delivery.pr_received_qty) as `pr_received_qty`,
                count(0) as `delivery_count`
            from ur_bi_dw.dwd_daily_delivery_size as delivery
            where delivery.date_key>=%s and delivery.date_key<%s
                and delivery.currency_code='CNY'
            group by shop_no,sku_no,date_key,`size`
            """
            sort_columns = ['date_key','shop_no','sku_no']
            self.get_data_of_date(
                save_dir=sub_dir,
                sql=sql,
                sort_columns=sort_columns,
                start_date=datetime.datetime(2017, 1, 1), # 开始时间
            )
            # 更新超时时间
            self.save_last_time(sub_dir)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_v_last_nation_sales_status(self):
        '''商品畅滞销数据'''
        file_path = os.path.join(self.save_dir,'v_last_nation_sales_status.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = 'SELECT * FROM ur_bi_dw.v_last_nation_sales_status'
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:c.replace('v_last_nation_sales_status.','') for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path, index=False)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dwd_daily_finacial_goods(self):
        '''商品成本价数据'''
        file_path = os.path.join(self.save_dir,'dwd_daily_finacial_goods.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = """
            select t1.sku_no,t1.`size`,t1.cost_tax_incl from ur_bi_dw.dwd_daily_finacial_goods as t1
            inner join (
                select sku_no,`size`,max(date_key) as date_key
                from ur_bi_dw.dwd_daily_finacial_goods
                where currency_code='CNY' and country_code='CN'
                group by sku_no,`size`
            ) as t2
            on t2.sku_no=t1.sku_no
                and t2.`size`=t1.`size`
                and t2.date_key=t1.date_key
            where t1.currency_code='CNY' and t1.country_code='CN'
            """
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:c.replace('t1.','') for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path, index=False)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dim_size_group(self):
        '''尺码映射数据'''
        file_path = os.path.join(self.save_dir,'dim_size_group.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = """select * from ur_bi_dw.dim_size_group"""
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:c.replace('dim_size_group.','') for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path, index=False)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
pass
def get_common_datas(
    host='10.2.32.22',
    port=21051,
    database='ur_ai_dw',
    auth_mechanism='LDAP',
    user='urbi',
    password='Ur#730xd',
    logger:logging.Logger=None):
    # 共用文件
    common_datas_dir = ShareArgs.get_args_value('common_datas_dir')
    common_ur_bi_dir = os.path.join(common_datas_dir, 'ur_bi_data')
    ur_bi_get_datas = UrBiGetDatas(
        host=host,
        port=port,
        database=database,
        auth_mechanism=auth_mechanism,
        user=user,
        password=password,
        save_dir=common_ur_bi_dir,
        logger=logger
    )
    try:
        logger.info('正在查询日期数据...')
        ur_bi_get_datas.get_dim_date()
        logger.info('查询日期数据完成!')
        logger.info('正在查询店铺数据...')
        ur_bi_get_datas.get_dim_shop()
        logger.info('查询店铺数据完成!')
        logger.info('正在查询天气数据...')
        ur_bi_get_datas.get_weather()
        logger.info('查询天气数据完成!')
        logger.info('正在查询天气城市数据...')
        ur_bi_get_datas.get_weather_city()
        logger.info('查询天气城市数据完成!')
        logger.info('正在查询货品数据...')
        ur_bi_get_datas.get_dim_goods()
        logger.info('查询货品数据完成!')
        logger.info('正在查询实际销量数据...')
        ur_bi_get_datas.get_dwd_daily_sales_size()
        logger.info('查询实际销量数据完成!')
    except Exception as ex:
        logger.exception(ex)
        raise ex # 往外抛出异常
    finally:
        ur_bi_get_datas.close()
pass
class CustomUrBiGetDatas(UrBiGetDatasBase):
    def __init__(
        self,
        host='10.2.32.22',
        port=21051,
        database='ur_ai_dw',
        auth_mechanism='LDAP',
        user='urbi',
        password='Ur#730xd',
        save_dir='./hjx/data/ur_bi_data',
        logger:logging.Logger=None
        ):
        self.save_dir = save_dir
        self.logger = logger
        super().__init__(
            host=host,
            port=port,
            database=database,
            auth_mechanism=auth_mechanism,
            user=user,
            password=password,
            save_dir=save_dir,
            logger=logger
            )
    def get_sales_goal_amt(self):
        '''销售目标金额'''
        file_path = os.path.join(self.save_dir,'month_of_year_sales_goal_amt.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = '''
            select sales_goal.shop_no,
                if(sales_goal.serial='Y','W',sales_goal.serial) as `sales_goal.serial`,
                dates.month_of_year,
                sum(sales_goal.sales_goal_amt) as sales_goal_amt
            from ur_bi_dw.dwd_sales_goal_west as sales_goal
            inner join ur_bi_dw.dim_date as dates
                on sales_goal.date_key = dates.date_key
            group by sales_goal.shop_no,
                if(sales_goal.serial='Y','W',sales_goal.serial),
                dates.month_of_year
            '''
            data:pd.DataFrame = self.db_helper.get_data(sql)
            data = data.rename(columns={
                'shop_no':'sales_goal.shop_no',
                'serial':'sales_goal.serial',
                'month_of_year':'dates.month_of_year',
            })
            # 排序
            data = data.sort_values(['sales_goal.shop_no','sales_goal.serial','dates.month_of_year'])
            data.to_csv(file_path)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_shop_serial_area(self):
        '''店-系列面积'''
        file_path = os.path.join(self.save_dir,'shop_serial_area.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = '''
            select shop_serial_area.shop_no,
                if(shop_serial_area.serial='Y','W',shop_serial_area.serial) as `shop_serial_area.serial`,
                shop_serial_area.month_of_year,
                sum(shop_serial_area.area) as `shop_serial_area.area`
            from ur_bi_dw.dwd_shop_serial_area as shop_serial_area
            where shop_serial_area.area is not null
            group by shop_serial_area.shop_no,if(shop_serial_area.serial='Y','W',shop_serial_area.serial),shop_serial_area.month_of_year
            '''
            data:pd.DataFrame = self.db_helper.get_data(sql)
            data = data.rename(columns={
                'shop_no':'shop_serial_area.shop_no',
                'serial':'shop_serial_area.serial',
                'month_of_year':'shop_serial_area.month_of_year',
                'area':'shop_serial_area.area',
            })
            # 排序
            data = data.sort_values(['shop_serial_area.shop_no','shop_serial_area.serial','shop_serial_area.month_of_year'])
            data.to_csv(file_path)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
pass
def get_datas(
    host='10.2.32.22',
    port=21051,
    database='ur_ai_dw',
    auth_mechanism='LDAP',
    user='urbi',
    password='Ur#730xd',
    save_dir='./data/sales_forecast/ur_bi_dw_data',
    logger:logging.Logger=None):
    ur_bi_get_datas = CustomUrBiGetDatas(
        host=host,
        port=port,
        database=database,
        auth_mechanism=auth_mechanism,
        user=user,
        password=password,
        save_dir=save_dir,
        logger=logger
    )
    try:
        # 店,系列,品类,年月,销售目标金额
        logger.info('正在查询年月销售目标金额数据...')
        ur_bi_get_datas.get_sales_goal_amt()
        logger.info('查询年月销售目标金额数据完成!')
    except Exception as ex:
        logger.exception(ex)
        raise ex # 往外抛出异常
    finally:
        ur_bi_get_datas.close()
pass
def getdata_ur_bi_dw(
    host='10.2.32.22',
    port=21051,
    database='ur_ai_dw',
    auth_mechanism='LDAP',
    user='urbi',
    password='Ur#730xd',
    save_dir='./data/sales_forecast/ur_bi_dw_data',
    logger=None
):
    get_common_datas(
        host=host,
        port=port,
        database=database,
        auth_mechanism=auth_mechanism,
        user=user,
        password=password,
        logger=logger
    )
    get_datas(
        host=host,
        port=port,
        database=database,
        auth_mechanism=auth_mechanism,
        user=user,
        password=password,
        save_dir=save_dir,
        logger=logger
    )
pass
# 代码入口
# getdata_ur_bi_dw(
#     host=ur_bi_dw_host,
#     port=ur_bi_dw_port,
#     database=ur_bi_dw_database,
#     auth_mechanism=ur_bi_dw_auth_mechanism,
#     user=ur_bi_dw_user,
#     password=ur_bi_dw_password,
#     save_dir=ur_bi_dw_save_dir,
#     logger=logger
#     )

代码说明和领悟

每个类的具体作用说明,代码需要根据下面的文字说明进行“食用”:

(第一层)HiveHelper完成了连接数据库、关闭数据库连接、生成事务、执行、引擎、连接等功能

VarsHelper提供了一个简单的持久化功能,可以将对象以文件的形式存放在磁盘上。并提供设置值、获取值、判断值是否存在的方法

GlobalShareArgs提供了一个字典,并且提供了获取字典、设置字典、设置字典键值对、设置字典键的值、判断键是否在字典中、更新字典等方法

ShareArgs跟GlobalShareArgs类似,只是一开始字典的初始化的键值对比较多

(第二层)UrBiGetDataBase类,提供了线程锁字典、时间字典、超时判断字典,都是类变量;使用了HiveHelper类,但注意,不是继承。在具体的sql读数时,提供了线程固定和时间判断

(第三层)UrBiGetDatas类,获取hive数据库那边的日期数据、店铺数据、会员数据、天气数据、天气城市数据、商品数据、店铺生命周期数据、全国商品生命周期数据、商品开发码数数据、实际销售金额、实际配货金额、商品畅滞销数据、商品成本价数据、尺码映射数据等。

(第四层)get_common_data函数,使用URBiGetData类读取日期、店铺、天气、天气城市、货品、实际销量数据,并缓存到文件夹./yongjian/data/ur_bi_data下面

CustomUrBiGetData类,继承了UrBiGetDatasBase类,读取销售目标金额、点系列面积数据。

(这个也是第四层)get_datas函数,通过CustomUrBiGetData类,读取年月销售目标金额。

总的函数:(这个是总的调用入口函数)get_data_ur_bi_dw函数,调用了get_common_data和get_datas函数进行读取数据,然后将数据保存到某个文件夹目录下面。

举一反三,如果你不是hive数据库,你可以将第一层这个底层更换成mysql。主页有解释如果进行更换。第二层不需要改变,第三层就是你想要进行读取的数据表,不同的数据库你想要读取的数据表也不同,所以sql需要你在这里写,套用里面的方法即可,基本上就是修改sql就好了。

这种方法的好处在于,数据不会重复读取,并且读取的数据都可以得到高效的使用。

后续附上修改成mysql的一个例子代码

import logging
import pandas as pd
from impala.dbapi import connect
import sqlalchemy
from sqlalchemy.orm import sessionmaker
import os
import time
import os
import datetime
from dateutil.relativedelta import relativedelta
from typing import Dict, List
import logging
import threading
import pandas as pd
import pickle
class MySqlHelper(object):
    def __init__(
        self,
        host='192.168.15.144',
        port=3306,
        database='test_ims',
        user='spkjz_writer',
        password='7cmoP3QDtueVJQj2q4Az',
        logger:logging.Logger=None
        ):
        self.host = host
        self.port = port
        self.database = database
        self.user = user
        self.password = password
        self.logger = logger
        self.connection_str = 'mysql+pymysql://%s:%s@%s:%d/%s' %(
            self.user, self.password, self.host, self.port, self.database
        )
        self.conn = None
        self.cursor = None
        self.engine = None
        self.session = None
    def create_table_code(self, file_name):
        '''创建表类代码'''
        os.system(f'sqlacodegen {self.connection_str} > {file_name}')
        return self.conn
    def get_conn(self):
        '''创建连接或获取连接'''
        if self.conn is None:
            engine = self.get_engine()
            self.conn = engine.connect()
        return self.conn
    def get_engine(self):
        '''创建连接或获取连接'''
        if self.engine is None:
            self.engine = sqlalchemy.create_engine(self.connection_str)
        return self.engine
    def get_cursor(self):
        '''创建连接或获取连接'''
        if self.cursor is None:
            self.cursor = self.conn.cursor()
        return self.cursor
    def get_session(self) -> sessionmaker:
        '''创建连接或获取连接'''
        if self.session is None:
            engine = self.get_engine()
            Session = sessionmaker(bind=engine)
            self.session = Session()
        return self.session
    def close_conn(self):
        '''关闭连接'''
        if self.conn is not None:
            self.conn.close()
            self.conn = None
        self.dispose_engine()
    def close_session(self):
        '''关闭连接'''
        if self.session is not None:
            self.session.close()
            self.session = None
        self.dispose_engine()
    def dispose_engine(self):
        '''释放engine'''
        if self.engine is not None:
            # self.engine.dispose(close=False)
            self.engine.dispose()
            self.engine = None
    def close_cursor(self):
        '''关闭cursor'''
        if self.cursor is not None:
            self.cursor.close()
            self.cursor = None
    def get_data(self, sql, auto_close=True) -> pd.DataFrame:
        '''查询数据'''
        conn = self.get_conn()
        data = None
        try:
            # 异常重试3次
            for i in range(3):
                try:
                    data = pd.read_sql(sql, conn)
                    break
                except Exception as ex:
                    if i == 2:
                        raise ex # 往外抛出异常
                    time.sleep(60) # 一分钟后重试
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            if auto_close:
                self.close_conn()
        return data
pass
class VarsHelper():
    def __init__(self, save_dir, auto_save=True):
        self.save_dir = save_dir
        self.auto_save = auto_save
        self.values = {}
        if not os.path.exists(os.path.dirname(self.save_dir)):
            os.makedirs(os.path.dirname(self.save_dir))
        if os.path.exists(self.save_dir):
            with open(self.save_dir, 'rb') as f:
                self.values = pickle.load(f)
                f.close()
    def set_value(self, key, value):
        self.values[key] = value
        if self.auto_save:
            self.save_file()
    def get_value(self, key):
        return self.values[key]
    def has_key(self, key):
        return key in self.values.keys()
    def save_file(self):
        with open(self.save_dir, 'wb') as f:
            pickle.dump(self.values, f)
            f.close()
pass
class GlobalShareArgs():
    args = {
        "debug": False
    }
    def get_args():
        return GlobalShareArgs.args
    def set_args(args):
        GlobalShareArgs.args = args
    def set_args_value(key, value):
        GlobalShareArgs.args[key] = value
    def get_args_value(key, default_value=None):
        return GlobalShareArgs.args.get(key, default_value)
    def contain_key(key):
        return key in GlobalShareArgs.args.keys()
    def update(args):
        GlobalShareArgs.args.update(args)
pass
class ShareArgs():
    args = {
        "labels_dir":"./hjx/shop_group/month_w_amt/data/labels", # 标签目录
        "labels_output_dir":"./hjx/shop_group/month_w_amt/data/labels_output", # 聚类导出标签目录
        "common_datas_dir":"./hjx/data", # 共用数据目录。ur_bi_dw的公共
        "only_predict": False, # 只识别,不训练
        "delete_model": True, # 先删除模型,仅在训练时使用
        "export_excel": False, # 导出excel
        "classes": 12, # 聚类数
        "batch_size": 16,
        "hidden_size": 32,
        "max_nrof_epochs": 100,
        "learning_rate": 0.0005,
        "loss_type": "categorical_crossentropy",
        "avg_model_num": 10,
        "steps_per_epoch": 4.0, # 4.0
        "lr_callback_patience": 4,
        "lr_callback_cooldown": 1,
        "early_stopping_callback_patience": 6,
        "get_data": True,
    }
    def get_args():
        return ShareArgs.args
    def set_args(args):
        ShareArgs.args = args
    def set_args_value(key, value):
        ShareArgs.args[key] = value
    def get_args_value(key, default_value=None):
        return ShareArgs.args.get(key, default_value)
    def contain_key(key):
        return key in ShareArgs.args.keys()
    def update(args):
        ShareArgs.args.update(args)
pass
class IMSGetDatasBase():
    # 线程锁列表,同保存路径共用锁
    lock_dict:Dict[str, threading.Lock] = {}
    # 时间列表,用于判断是否超时
    time_dict:Dict[str, datetime.datetime] = {}
    # 用于记录是否需要更新超时时间
    get_data_timeout_dict:Dict[str, bool] = {}
    def __init__(
        self,
        host='192.168.15.144',
        port=3306,
        database='test_ims',
        user='spkjz_writer',
        password='Ur#7cmoP3QDtueVJQj2q4Az',
        save_dir=None,
        logger:logging.Logger=None,
        ):
        self.save_dir = save_dir
        self.logger = logger
        self.db_helper = MySqlHelper(
            host=host,
            port=port,
            database=database,
            user=user,
            password=password,
            logger=logger
            )
        # 创建子目录
        if self.save_dir is not None and not os.path.exists(self.save_dir):
            os.makedirs(self.save_dir)
        self.vars_helper = None
        if GlobalShareArgs.get_args_value('debug'):
            self.vars_helper = VarsHelper('./hjx/data/vars/IMSGetDatas') # 把超时时间保存到文件,注释该行即可停掉,只用于调试
    def close(self):
        '''关闭连接'''
        self.db_helper.close_conn()
    def get_last_time(self, key_name) -> bool:
        '''获取是否超时'''
        # 转静态路径,确保唯一性
        key_name = os.path.abspath(key_name)
        if self.vars_helper is not None and self.vars_helper.has_key('IMSGetDatasBase.time_list'):
            IMSGetDatasBase.time_dict = self.vars_helper.get_value('IMSGetDatasBase.time_list')
        timeout = 12 # 12小时
        if GlobalShareArgs.get_args_value('debug'):
            timeout = 24 # 24小时
        get_data_timeout = False
        if key_name not in IMSGetDatasBase.time_dict.keys() or (datetime.datetime.today() - IMSGetDatasBase.time_dict[key_name]).total_seconds()>(4*60*60):
            self.logger.info('超时%d小时,重新查数据:%s', timeout, key_name)
            # IMSGetDatasBase.time_list[key_name] = datetime.datetime.today()
            get_data_timeout = True
        else:
            self.logger.info('未超时%d小时,跳过查数据:%s', timeout, key_name)
        # if self.vars_helper is not None :
        #     self.vars_helper.set_value('IMSGetDatasBase.time_list', IMSGetDatasBase.time_list)
        IMSGetDatasBase.get_data_timeout_dict[key_name] = get_data_timeout
        return get_data_timeout
    def save_last_time(self, key_name):
        '''更新状态超时'''
        # 转静态路径,确保唯一性
        key_name = os.path.abspath(key_name)
        if IMSGetDatasBase.get_data_timeout_dict[key_name]:
            IMSGetDatasBase.time_dict[key_name] = datetime.datetime.today()
        if self.vars_helper is not None :
            IMSGetDatasBase.time_dict[key_name] = datetime.datetime.today()
            self.vars_helper.set_value('IMSGetDatasBase.time_list', IMSGetDatasBase.time_dict)
    def get_lock(self, key_name) -> threading.Lock:
        '''获取锁'''
        # 转静态路径,确保唯一性
        key_name = os.path.abspath(key_name)
        if key_name not in IMSGetDatasBase.lock_dict.keys():
            IMSGetDatasBase.lock_dict[key_name] = threading.Lock()
        return IMSGetDatasBase.lock_dict[key_name]
    def get_data_of_date(
        self,
        save_dir,
        sql,
        sort_columns:List[str],
        del_index_list=[-1], # 删除最后下标
        start_date = datetime.datetime(2017, 1, 1), # 开始时间
        offset = relativedelta(months=3), # 时间间隔
        date_format_fun = lambda d: '%04d%02d01' % (d.year, d.month), # 查询语句中替代时间参数的格式化
        filename_format_fun = lambda d: '%04d%02d.csv' % (d.year, d.month), # 查询语句中替代时间参数的格式化
        stop_date = '20700101', # 超过时间则停止
        ):
        '''分时间增量读取数据'''
        # 创建文件夹
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)
        else:
            #删除最后一个文件
            file_list = os.listdir(save_dir)
            if len(file_list)>0:
                file_list.sort()
                for del_index in del_index_list:
                    os.remove(os.path.join(save_dir,file_list[del_index]))
                    print('删除最后一个文件:', file_list[del_index])
        select_index = -1
        # start_date = datetime.datetime(2017, 1, 1)
        while True:
            end_date = start_date + offset
            start_date_str = date_format_fun(start_date)
            end_date_str = date_format_fun(end_date)
            self.logger.info('date: %s-%s', start_date_str, end_date_str)
            file_path = os.path.join(save_dir, filename_format_fun(start_date))
            # self.logger.info('file_path: %s', file_path)
            if not os.path.exists(file_path):
                data:pd.DataFrame = self.db_helper.get_data(sql % (start_date_str, end_date_str))
                if data is None:
                    break
                self.logger.info('data: %d', len(data))
                # self.logger.info('data: %d', data.columns)
                if len(data)>0:
                    select_index+=1
                    # 排序
                    data = data.sort_values(sort_columns)
                    data.to_csv(file_path)
                elif select_index!=-1:
                    break
                elif stop_date < start_date_str:
                    raise Exception("读取数据异常,时间超出最大值!")
            start_date = end_date
pass
class CustomIMSGetDatas(IMSGetDatasBase):
    def __init__(
        self,
        host='192.168.13.134',
        port=4000,
        database='test_ims',
        user='root',
        password='rootimmsadmin',
        save_dir='./hjx/data/export_ims_data',
        logger:logging.Logger=None
        ):
        self.save_dir = save_dir
        self.logger = logger
        super().__init__(
            host=host,
            port=port,
            database=database,
            user=user,
            password=password,
            save_dir=save_dir,
            logger=logger
            )
    def get_ims_w_amt_pro(self):
        '''年月系列占比数据'''
        file_path = os.path.join(self.save_dir,'ims_w_amt_pro.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            # if not self.get_last_time(file_path):
            #     return
            sql = 'SELECT * FROM ims_w_amt_pro'
            data:pd.DataFrame = self.db_helper.get_data(sql)
            data = data.rename(columns={
                'serial_forecast_proportion': 'forecast_proportion',
            })
            data.to_csv(file_path)
            # # 更新超时时间
            # self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
pass
def get_datas(
    host='192.168.13.134',
    port=4000,
    database='test_ims',
    user='root',
    password='rootimmsadmin',
    save_dir='./hjx/data/export_ims_data',
    logger:logging.Logger=None
    ):
    ur_bi_get_datas = CustomIMSGetDatas(
        host=host,
        port=port,
        database=database,
        user=user,
        password=password,
        save_dir=save_dir,
        logger=logger
    )
    try:
        # 年月系列占比数据
        logger.info('正在查询年月系列占比数据...')
        ur_bi_get_datas.get_ims_w_amt_pro()
        logger.info('查询年月系列占比数据完成!')
    except Exception as ex:
        logger.exception(ex)
        raise ex # 往外抛出异常
    finally:
        ur_bi_get_datas.close()
pass
def getdata_export_ims(
    host='192.168.13.134',
    port=4000,
    database='test_ims',
    user='root',
    password='rootimmsadmin',
    save_dir='./hjx/data/export_ims_data',
    logger:logging.Logger=None
    ):
    get_datas(
        host=host,
        port=port,
        database=database,
        user=user,
        password=password,
        save_dir=save_dir,
        logger=logger
    )
pass

到此这篇关于Python读取Hive数据库实现代码详解的文章就介绍到这了,更多相关Python读取Hive数据库内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 在python中使用pyspark读写Hive数据操作

    1.读Hive表数据 pyspark读取hive数据非常简单,因为它有专门的接口来读取,完全不需要像hbase那样,需要做很多配置,pyspark提供的操作hive的接口,使得程序可以直接使用SQL语句从hive里面查询需要的数据,代码如下: from pyspark.sql import HiveContext,SparkSession _SPARK_HOST = "spark://spark-master:7077" _APP_NAME = "test" spa

  • 如何在python中写hive脚本

    这篇文章主要介绍了如何在python中写hive脚本,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 1.直接执行.sql脚本 import numpy as np import pandas as pd import lightgbm as lgb from pandas import DataFrame from sklearn.model_selection import train_test_split from io import St

  • 使用Python构造hive insert语句说明

    mysql可以使用nevicat导出insert语句用于数据构造,但是hive无法直接导出insert语句.我们可以先打印在hive命令行,然后使用脚本拼装成insert语句,进行数据构造. 手动copy到python脚本进行sql语句构造: def transformString(s): list_s = s.split('\t') print(len(list_s)) s_new = '' for item in list_s: s_new += '\"' + item.strip(' ')

  • python 操作hive pyhs2方式

    使用kerberos时 import pyhs2 class HiveClient: # 初始化 def __init__(self, db_host, user, password, database, port=10000, authMechanism="PLAIN", configuration=None): self.conn = pyhs2.connect(host=db_host, port=port, authMechanism=authMechanism, user=u

  • python 实现 hive中类似 lateral view explode的功能示例

    背景:加入现在有这样的数据,可能一条ocr代表两个label,并且label通过","分隔.我们想把数据转换成下面的. 原始数据: label ocr 日常行车服务,汽车资讯 去加油站,加完油后直接离开?最开心的可能是加油站的工作人员 社会民生 已致2死20伤 !景区突遭尘卷风袭击,孩子被卷上天!现场画面曝光 目标数据: label ocr 日常行车服务 去加油站,加完油后直接离开?最开心的可能是加油站的工作人员 汽车资讯 去加油站,加完油后直接离开?最开心的可能是加油站的工作人员 社

  • Python pandas 列转行操作详解(类似hive中explode方法)

    最近在工作上用到Python的pandas库来处理excel文件,遇到列转行的问题.找了一番资料后成功了,记录一下. 1. 如果需要爆炸的只有一列: df=pd.DataFrame({'A':[1,2],'B':[[1,2],[1,2]]}) df Out[1]: A B 0 1 [1, 2] 1 2 [1, 2] 如果要爆炸B这一列,可以直接用explode方法(前提是你的pandas的版本要高于或等于0.25) df.explode('B') A B 0 1 1 1 1 2 2 2 1 3

  • python导出hive数据表的schema实例代码

    本文研究的主要问题是python语言导出hive数据表的schema,分享了实现代码,具体如下. 为了避免运营提出无穷无尽的查询需求,我们决定将有查询价值的数据从mysql导入hive中,让他们使用HUE这个开源工具进行查询.想必他们对表结构不甚了解,还需要为之提供一个表结构说明,于是编写了一个脚本,从hive数据库中将每张表的字段即类型查询出来,代码如下: #coding=utf-8 import pyhs2 from xlwt import * hiveconn = pyhs2.connec

  • python处理数据,存进hive表的方法

    首先,公司的小组长给了我一个任务,把一个txt的文件中的部分内容,存进一个在hive中已有的表的相同结构的表中.所以我的流程主要有三个,首先,把数据处理成和hive中表相同结构的数据,然后仿照已有的hive中表的结构再创建一张新的数据表,最后把本地的txt文件上传到hive中新建的数据表中. 1:已有的数据表的结构和在hive表中的结构完全对不上,下面的图是原来hive中表的结构和小组长给我的txt中表的结构: 大家可以看出,我们原来的hive中表的字段一共有17个,而组长给我的表中的字段一共有

  • 对python读取CT医学图像的实例详解

    需要安装OpenCV和SimpleItk. SimpleItk比较简单,直接pip install SimpleItk即可. 代码如下: #coding:utf-8 import SimpleITK as sitk import cv2 #LKDS-00058,-102.655469971,108.188810974,438.759994507,12.2279986879 if __name__ == '__main__': filename = "F:/cancer_solution/data

  • python读取mnist数据集方法案例详解

    mnist手写数字数据集在机器学习中非常常见,这里记录一下用python从本地读取mnist数据集的方法. 数据集格式介绍 这部分内容网络上很常见,这里还是简明介绍一下.网络上下载的mnist数据集包含4个文件: 前两个分别是测试集的image和label,包含10000个样本.后两个是训练集的,包含60000个样本..gz表示这个一个压缩包,如果进行解压的话,会得到.ubyte格式的二进制文件. 上图是训练集的label和image数据的存储格式.两个文件最开始都有magic number和n

  • Python中协程用法代码详解

    本文研究的主要是python中协程的相关问题,具体介绍如下. Num01–>协程的定义 协程,又称微线程,纤程.英文名Coroutine. 首先我们得知道协程是啥?协程其实可以认为是比线程更小的执行单元. 为啥说他是一个执行单元,因为他自带CPU上下文.这样只要在合适的时机, 我们可以把一个协程 切换到另一个协程. 只要这个过程中保存或恢复 CPU上下文那么程序还是可以运行的. Num02–>协程和线程的差异 那么这个过程看起来和线程差不多.其实不然, 线程切换从系统层面远不止保存和恢复 CP

  • Python简易计算器制作方法代码详解

    主要用到的工具是Python中的Tkinter库 比较简单 直接上图形界面和代码 引用Tkinter库 from tkinter import * 建立主窗口对象 window=Tk() #设置窗口对象 window.title('counting machine') window.geometry("350x280") window['bg']='red' 建立标签框以及标签(将运算字符串显示在上面) frame=LabelFrame(window,bg='yellow',width

  • python scrapy重复执行实现代码详解

    这篇文章主要介绍了python scrapy重复执行实现代码详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 Scrapy是一个为了爬取网站数据,提取结构性数据而编写的应用框架,我们只需要实现少量的代码,就能够快速的抓取 Scrapy模块: 1.scheduler:用来存放url队列 2.downloader:发送请求 3.spiders:提取数据和url 4.itemPipeline:数据保存 from twisted.internet i

  • Python 八个数据清洗实例代码详解

    如果你经历过数据清洗的过程,你就会明白我的意思.而这正是撰写这篇文章的目的——让读者更轻松地进行数据清洗工作. 事实上,我在不久前意识到,在进行数据清洗时,有一些数据具有相似的模式.也正是从那时起,我开始整理并编译了一些数据清洗代码,我认为这些代码也适用于其它的常见场景. 由于这些常见的场景涉及到不同类型的数据集,因此本文更加侧重于展示和解释这些代码可以用于完成哪些工作,以便读者更加方便地使用它们. 数据清洗小工具箱 在下面的代码片段中,数据清洗代码被封装在了一些函数中,代码的目的十分直观.你可

  • Python Matplotlib绘制动画的代码详解

    目录 matplotlib 动画 人口出生率 男女人口总数 雨滴 matplotlib 动画 我们想制作一个动画,其中正弦和余弦函数在屏幕上逐步绘制.首先需要告诉matplotlib我们想要制作一个动画,然后必须指定想要在每一帧绘制什么.一个常见的错误是重新绘制每一帧的所有内容,这会使整个过程非常缓慢.相反地,只能更新必要的内容,因为我们知道许多内容不会随着帧的变化而改变.对于折线图,我们将使用set_data方法更新绘图,剩下的工作由matplotlib完成. 注意随着动画移动的终点标记.原因

  • Python操作SQLite数据库的方法详解

    本文实例讲述了Python操作SQLite数据库的方法.分享给大家供大家参考,具体如下: SQLite简单介绍 SQLite数据库是一款非常小巧的嵌入式开源数据库软件,也就是说没有独立的维护进程,所有的维护都来自于程序本身.它是遵守ACID的关联式数据库管理系统,它的设计目标是嵌入式的,而且目前已经在很多嵌入式产品中使用了它,它占用资源非常的低,在嵌入式设备中,可能只需要几百K的内存就够了.它能够支持Windows/Linux/Unix等等主流的操作系统,同时能够跟很多程序语言相结合,比如 Tc

  • Python开发SQLite3数据库相关操作详解【连接,查询,插入,更新,删除,关闭等】

    本文实例讲述了Python开发SQLite3数据库相关操作.分享给大家供大家参考,具体如下: '''SQLite数据库是一款非常小巧的嵌入式开源数据库软件,也就是说 没有独立的维护进程,所有的维护都来自于程序本身. 在python中,使用sqlite3创建数据库的连接,当我们指定的数据库文件不存在的时候 连接对象会自动创建数据库文件:如果数据库文件已经存在,则连接对象不会再创建 数据库文件,而是直接打开该数据库文件. 连接对象可以是硬盘上面的数据库文件,也可以是建立在内存中的,在内存中的数据库

  • Python操作mongodb数据库的方法详解

    本文实例讲述了Python操作mongodb数据库的方法.分享给大家供大家参考,具体如下: 安装pymongo 下载pymongo: https://pypi.python.org/packages/82/26/f45f95841de5164c48e2e03aff7f0702e22cef2336238d212d8f93e91ea8/pymongo-3.4.0.tar.gz#md5=aa77f88e51e281c9f328cea701bb6f3e 安装pymongo: 解压后,cmd进入pymon

随机推荐