python如何解析复杂sql,实现数据库和表的提取的实例剖析

需求:

公司的数据分析师,提交一个sql, 一般都三四百行。由于数据安全的需要,不能开放所有的数据库和数据表给数据分析师查询,所以需要解析sql中的数据库和表,与权限管理系统中记录的数据库和表权限信息比对,实现非法查询的拦截。

解决办法:

在解决这个问题前,现在github找了一下轮子,发现python下面除了sql parse没什么好的解析数据库和表的轮轮。到是在java里面找到presto-parser解析的比较准。于是自己结合sql parse源码写了个类,供大家参考,测试了一下,检测还是准的。

测试sql

select
b.product_name "产品",
count(a.order_id) "订单量",
b.selling_price_max "销售价",
b.gross_profit_rate_max/100 "毛利率",
case when b.business_type =1 then '自营消化' when b.business_type =2 then '服务商消化' end "消化模式"
from(select 'CRM签单' label,date(d.update_ymd) close_ymd,c.product_name,c.product_id,
  a.order_id,cast(a.recipient_amount as double) amt,d.cost
  from mysql4.dataview_fenxiao.fx_order a
  left join mysql4.dataview_fenxiao.fx_order_task b on a.order_id = b.order_id
  left join mysql7.dataview_trade.ddc_product_info c on cast(c.product_id as varchar) = a.product_ids and c.snapshot_version = 'SELLING'
  inner join (select t1.par_order_id,max(t1.update_ymd) update_ymd,
        sum(case when t4.product2_type = 1 and t5.shop_id is not null then t5.price else t1.order_hosted_price end) cost
        from hive.bdc_dwd.dw_mk_order t1
        left join hive.bdc_dwd.dw_mk_order_status t2 on t1.order_id = t2.order_id and t2.acct_day = substring(cast(DATE_ADD('day',-1,CURRENT_DATE) as varchar),9,2)
        left join mysql7.dataview_trade.mk_order_merchant t3 on t1.order_id = t3.order_id
        left join mysql7.dataview_trade.ddc_product_info t4 on t4.product_id = t3.MERCHANT_ID and t4.snapshot_version = 'SELLING'
        left join mysql4.dataview_scrm.sc_tprc_product_info t5 on t5.product_id = t4.product_id and t5.shop_id = t1.seller_id
        where t1.acct_day = substring(cast(DATE_ADD('day',-1,CURRENT_DATE) as varchar),9,2)
        and t2.valid_state in (100,200) ------有效订单
        and t1.order_mode = 10  --------产品消耗订单
        and t2.complete_state = 1 -----订单已经完成
        group by t1.par_order_id
  ) d on d.par_order_id = b.task_order_id
  where c.product_type = 0 and date(from_unixtime(a.last_recipient_time)) > date('2016-01-01') and a.payee_type <> 1 -----------已收款
  UNION ALL
  select '企业管家消耗' label,date(c.update_ymd) close_ymd,b.product_name,b.product_id,
  a.task_id,(case when a.yb_price = 0 and b.product2_type = 1 then b.selling_price_min else a.yb_price end) amt,
  (case when a.yb_price = 0 and b.product2_type = 2 then 0 when b.product2_type = 1 and e.shop_id is not null then e.price else c.order_hosted_price end) cost
  from mysql8.dataview_tprc.tprc_task a
  left join mysql7.dataview_trade.ddc_product_info b on a.product_id = b.product_id and b.snapshot_version = 'SELLING'
  inner join hive.bdc_dwd.dw_mk_order c on a.order_id = c.order_id and c.acct_day = substring(cast(DATE_ADD('day',-1,CURRENT_DATE) as varchar),9,2)
  left join hive.bdc_dwd.dw_mk_order_status d on d.order_id = c.order_id and d.acct_day = substring(cast(DATE_ADD('day',-1,CURRENT_DATE) as varchar),9,2)
  left join mysql4.dataview_scrm.sc_tprc_product_info e on e.product_id = b.product_id and e.shop_id = c.seller_id
  where d.valid_state in (100,200) and d.complete_state = 1 and c.order_mode = 10
  union ALL
  select '交易管理系统' label,date(t6.close_ymd) close_ymd,t4.product_name,t4.product_id,
  t1.order_id,(t1.order_hosted_price-t1.order_refund_price) amt,
  (case when t1.order_mode <> 11 then t7.user_amount when t1.order_mode = 11 and t4.product2_type = 1 and t5.shop_id is not null then t5.price else t8.cost end) cost
  from hive.bdc_dwd.dw_mk_order t1
  left join hive.bdc_dwd.dw_mk_order_business t2 on t1.order_id = t2.order_id and t2.acct_day=substring(cast(DATE_ADD('day',-1,CURRENT_DATE) as varchar),9,2)
  left join mysql7.dataview_trade.mk_order_merchant t3 on t1.order_id = t3.order_id
  left join mysql7.dataview_trade.ddc_product_info t4 on t4.product_id = t3.MERCHANT_ID and t4.snapshot_version = 'SELLING'
  left join mysql4.dataview_scrm.sc_tprc_product_info t5 on t5.product_id = t4.product_id and t5.shop_id = t1.seller_id
  left join hive.bdc_dwd.dw_fact_task_ss_daily t6 on t6.task_id = t2.task_id and t6.acct_time=date_format(date_add('day',-1,current_date),'%Y-%m-%d')
  left join (select a.task_id,sum(a.user_amount) user_amount
        from hive.bdc_dwd.dw_fn_deal_asyn_order a
        where a.is_new=1 and a.service='Trade_Payment' and a.state=1 and a.acct_day=substring(cast(DATE_ADD('day',-1,CURRENT_DATE) as varchar),9,2)
        group by a.task_id)t7 on t7.task_id = t2.task_id
  left join (select t1.par_order_id,sum(t1.order_hosted_price - t1.order_refund_price) cost
        from hive.bdc_dwd.dw_mk_order t1
        where t1.acct_day = substring(cast(DATE_ADD('day',-1,CURRENT_DATE) as varchar),9,2) and t1.order_type = 1 and t1.order_stype = 4 and t1.order_mode = 12
        group by t1.par_order_id) t8 on t1.order_id = t8.par_order_id
  where t1.acct_day = substring(cast(DATE_ADD('day',-1,CURRENT_DATE) as varchar),9,2)
  and t1.order_type = 1 and t1.order_stype in (4,5) and t1.order_mode <> 12 and t4.product_id is not null and t1.order_hosted_price > 0 and t6.is_deal = 1 and t6.close_ymd >= '2018-12-31'
)a
left join mysql7.dataview_trade.ddc_product_info b on a.product_id = b.product_id and b.snapshot_version = 'SELLING'
where b.product2_type = 1 -------标品
and close_ymd between DATE_ADD('day',-7,CURRENT_DATE) and DATE_ADD('day',-1,CURRENT_DATE)
GROUP BY b.product_name,
b.selling_price_max,
b.gross_profit_rate_max/100,
b.actrul_supply_num,
case when b.business_type =1 then '自营消化' when b.business_type =2 then '服务商消化' end
order by count(a.order_id) desc
limit 10

可以看到该sql比较杂,也没有格式化,不太好提取数据库和表。所以第一步需要对sql进行格式化

直接上代码:

# coding=utf-8
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import sqlparse
from sqlparse.sql import Identifier, IdentifierList
from sqlparse.tokens import Keyword, Name

RESULT_OPERATIONS = {'UNION', 'INTERSECT', 'EXCEPT', 'SELECT'}
ON_KEYWORD = 'ON'
PRECEDES_TABLE_NAME = {'FROM', 'JOIN', 'DESC', 'DESCRIBE', 'WITH'}

class BaseExtractor(object):
  def __init__(self, sql_statement):
    self.sql = sqlparse.format(sql_statement, reindent=True, keyword_case='upper')
    self._table_names = set()
    self._alias_names = set()
    self._limit = None
    self._parsed = sqlparse.parse(self.stripped())
    for statement in self._parsed:
      self.__extract_from_token(statement)
      self._limit = self._extract_limit_from_query(statement)
    self._table_names = self._table_names - self._alias_names

  @property
  def tables(self):
    return self._table_names

  @property
  def limit(self):
    return self._limit

  def is_select(self):
    return self._parsed[0].get_type() == 'SELECT'

  def is_explain(self):
    return self.stripped().upper().startswith('EXPLAIN')

  def is_readonly(self):
    return self.is_select() or self.is_explain()

  def stripped(self):
    return self.sql.strip(' \t\n;')

  def get_statements(self):
    statements = []
    for statement in self._parsed:
      if statement:
        sql = str(statement).strip(' \n;\t')
        if sql:
          statements.append(sql)
    return statements

  @staticmethod
  def __precedes_table_name(token_value):
    for keyword in PRECEDES_TABLE_NAME:
      if keyword in token_value:
        return True
    return False

  @staticmethod
  def get_full_name(identifier):
    if len(identifier.tokens) > 1 and identifier.tokens[1].value == '.':
      return '{}.{}'.format(identifier.tokens[0].value,
                 identifier.tokens[2].value)
    return identifier.get_real_name()

  @staticmethod
  def __is_result_operation(keyword):
    for operation in RESULT_OPERATIONS:
      if operation in keyword.upper():
        return True
    return False

  @staticmethod
  def __is_identifier(token):
    return isinstance(token, (IdentifierList, Identifier))

  def __process_identifier(self, identifier):
    if '(' not in '{}'.format(identifier):
      self._table_names.add(self.get_full_name(identifier))
      return

    # store aliases
    if hasattr(identifier, 'get_alias'):
      self._alias_names.add(identifier.get_alias())
    if hasattr(identifier, 'tokens'):
      # some aliases are not parsed properly
      if identifier.tokens[0].ttype == Name:
        self._alias_names.add(identifier.tokens[0].value)
    self.__extract_from_token(identifier)

  def as_create_table(self, table_name, overwrite=False):
    exec_sql = ''
    sql = self.stripped()
    if overwrite:
      exec_sql = 'DROP TABLE IF EXISTS {};\n'.format(table_name)
    exec_sql += 'CREATE TABLE {} AS \n{}'.format(table_name, sql)
    return exec_sql

  def __extract_from_token(self, token):
    if not hasattr(token, 'tokens'):
      return

    table_name_preceding_token = False

    for item in token.tokens:
      if item.is_group and not self.__is_identifier(item):
        self.__extract_from_token(item)

      if item.ttype in Keyword:
        if self.__precedes_table_name(item.value.upper()):
          table_name_preceding_token = True
          continue

      if not table_name_preceding_token:
        continue

      if item.ttype in Keyword or item.value == ',':
        if (self.__is_result_operation(item.value) or
            item.value.upper() == ON_KEYWORD):
          table_name_preceding_token = False
          continue
        # FROM clause is over
        break

      if isinstance(item, Identifier):
        self.__process_identifier(item)

      if isinstance(item, IdentifierList):
        for token in item.tokens:
          if self.__is_identifier(token):
            self.__process_identifier(token)

  def _get_limit_from_token(self, token):
    if token.ttype == sqlparse.tokens.Literal.Number.Integer:
      return int(token.value)
    elif token.is_group:
      return int(token.get_token_at_offset(1).value)

  def _extract_limit_from_query(self, statement):
    limit_token = None
    for pos, item in enumerate(statement.tokens):
      if item.ttype in Keyword and item.value.lower() == 'limit':
        limit_token = statement.tokens[pos + 2]
        return self._get_limit_from_token(limit_token)

  def get_query_with_new_limit(self, new_limit):
    if not self._limit:
      return self.sql + ' LIMIT ' + str(new_limit)
    limit_pos = None
    tokens = self._parsed[0].tokens
    # Add all items to before_str until there is a limit
    for pos, item in enumerate(tokens):
      if item.ttype in Keyword and item.value.lower() == 'limit':
        limit_pos = pos
        break
    limit = tokens[limit_pos + 2]
    if limit.ttype == sqlparse.tokens.Literal.Number.Integer:
      tokens[limit_pos + 2].value = new_limit
    elif limit.is_group:
      tokens[limit_pos + 2].value = (
        '{}, {}'.format(next(limit.get_identifiers()), new_limit)
      )

    str_res = ''
    for i in tokens:
      str_res += str(i.value)
    return str_res

class SqlExtractor(BaseExtractor):
  """提取sql语句"""

  @staticmethod
  def get_full_name(identifier, including_dbs=False):
    if len(identifier.tokens) > 1 and identifier.tokens[1].value == '.':
      a = identifier.tokens[0].value
      b = identifier.tokens[2].value
      db_table = (a, b)
      full_tree = '{}.{}'.format(a, b)
      if len(identifier.tokens) == 3:
        return full_tree
      else:
        i = identifier.tokens[3].value
        c = identifier.tokens[4].value
        if i == ' ':
          return full_tree
        full_tree = '{}.{}.{}'.format(a, b, c)
        return full_tree
    return None, None

if __name__ == '__main__':
  sql = """select
  b.product_name "产品",
  count(a.order_id) "订单量",
  b.selling_price_max "销售价",
  b.gross_profit_rate_max/100 "毛利率",
  case when b.business_type =1 then '自营消化' when b.business_type =2 then '服务商消化' end "消化模式"
  from(select 'CRM签单' label,date(d.update_ymd) close_ymd,c.product_name,c.product_id,
    a.order_id,cast(a.recipient_amount as double) amt,d.cost
    from mysql4.dataview_fenxiao.fx_order a
    left join mysql4.dataview_fenxiao.fx_order_task b on a.order_id = b.order_id
    left join mysql7.dataview_trade.ddc_product_info c on cast(c.product_id as varchar) = a.product_ids and c.snapshot_version = 'SELLING'
    inner join (select t1.par_order_id,max(t1.update_ymd) update_ymd,
          sum(case when t4.product2_type = 1 and t5.shop_id is not null then t5.price else t1.order_hosted_price end) cost
          from hive.bdc_dwd.dw_mk_order t1
          left join hive.bdc_dwd.dw_mk_order_status t2 on t1.order_id = t2.order_id and t2.acct_day = substring(cast(DATE_ADD('day',-1,CURRENT_DATE) as varchar),9,2)
          left join mysql7.dataview_trade.mk_order_merchant t3 on t1.order_id = t3.order_id
          left join mysql7.dataview_trade.ddc_product_info t4 on t4.product_id = t3.MERCHANT_ID and t4.snapshot_version = 'SELLING'
          left join mysql4.dataview_scrm.sc_tprc_product_info t5 on t5.product_id = t4.product_id and t5.shop_id = t1.seller_id
          where t1.acct_day = substring(cast(DATE_ADD('day',-1,CURRENT_DATE) as varchar),9,2)
          and t2.valid_state in (100,200) ------有效订单
          and t1.order_mode = 10  --------产品消耗订单
          and t2.complete_state = 1 -----订单已经完成
          group by t1.par_order_id
    ) d on d.par_order_id = b.task_order_id
    where c.product_type = 0 and date(from_unixtime(a.last_recipient_time)) > date('2016-01-01') and a.payee_type <> 1 -----------已收款
    UNION ALL
    select '企业管家消耗' label,date(c.update_ymd) close_ymd,b.product_name,b.product_id,
    a.task_id,(case when a.yb_price = 0 and b.product2_type = 1 then b.selling_price_min else a.yb_price end) amt,
    (case when a.yb_price = 0 and b.product2_type = 2 then 0 when b.product2_type = 1 and e.shop_id is not null then e.price else c.order_hosted_price end) cost
    from mysql8.dataview_tprc.tprc_task a
    left join mysql7.dataview_trade.ddc_product_info b on a.product_id = b.product_id and b.snapshot_version = 'SELLING'
    inner join hive.bdc_dwd.dw_mk_order c on a.order_id = c.order_id and c.acct_day = substring(cast(DATE_ADD('day',-1,CURRENT_DATE) as varchar),9,2)
    left join hive.bdc_dwd.dw_mk_order_status d on d.order_id = c.order_id and d.acct_day = substring(cast(DATE_ADD('day',-1,CURRENT_DATE) as varchar),9,2)
    left join mysql4.dataview_scrm.sc_tprc_product_info e on e.product_id = b.product_id and e.shop_id = c.seller_id
    where d.valid_state in (100,200) and d.complete_state = 1 and c.order_mode = 10
    union ALL
    select '交易管理系统' label,date(t6.close_ymd) close_ymd,t4.product_name,t4.product_id,
    t1.order_id,(t1.order_hosted_price-t1.order_refund_price) amt,
    (case when t1.order_mode <> 11 then t7.user_amount when t1.order_mode = 11 and t4.product2_type = 1 and t5.shop_id is not null then t5.price else t8.cost end) cost
    from hive.bdc_dwd.dw_mk_order t1
    left join hive.bdc_dwd.dw_mk_order_business t2 on t1.order_id = t2.order_id and t2.acct_day=substring(cast(DATE_ADD('day',-1,CURRENT_DATE) as varchar),9,2)
    left join mysql7.dataview_trade.mk_order_merchant t3 on t1.order_id = t3.order_id
    left join mysql7.dataview_trade.ddc_product_info t4 on t4.product_id = t3.MERCHANT_ID and t4.snapshot_version = 'SELLING'
    left join mysql4.dataview_scrm.sc_tprc_product_info t5 on t5.product_id = t4.product_id and t5.shop_id = t1.seller_id
    left join hive.bdc_dwd.dw_fact_task_ss_daily t6 on t6.task_id = t2.task_id and t6.acct_time=date_format(date_add('day',-1,current_date),'%Y-%m-%d')
    left join (select a.task_id,sum(a.user_amount) user_amount
          from hive.bdc_dwd.dw_fn_deal_asyn_order a
          where a.is_new=1 and a.service='Trade_Payment' and a.state=1 and a.acct_day=substring(cast(DATE_ADD('day',-1,CURRENT_DATE) as varchar),9,2)
          group by a.task_id)t7 on t7.task_id = t2.task_id
    left join (select t1.par_order_id,sum(t1.order_hosted_price - t1.order_refund_price) cost
          from hive.bdc_dwd.dw_mk_order t1
          where t1.acct_day = substring(cast(DATE_ADD('day',-1,CURRENT_DATE) as varchar),9,2) and t1.order_type = 1 and t1.order_stype = 4 and t1.order_mode = 12
          group by t1.par_order_id) t8 on t1.order_id = t8.par_order_id
    where t1.acct_day = substring(cast(DATE_ADD('day',-1,CURRENT_DATE) as varchar),9,2)
    and t1.order_type = 1 and t1.order_stype in (4,5) and t1.order_mode <> 12 and t4.product_id is not null and t1.order_hosted_price > 0 and t6.is_deal = 1 and t6.close_ymd >= '2018-12-31'
  )a
  left join mysql7.dataview_trade.ddc_product_info b on a.product_id = b.product_id and b.snapshot_version = 'SELLING'
  where b.product2_type = 1 -------标品
  and close_ymd between DATE_ADD('day',-7,CURRENT_DATE) and DATE_ADD('day',-1,CURRENT_DATE)
  GROUP BY b.product_name,
  b.selling_price_max,
  b.gross_profit_rate_max/100,
  b.actrul_supply_num,
  case when b.business_type =1 then '自营消化' when b.business_type =2 then '服务商消化' end
  order by count(a.order_id) desc
  limit 10"""
  sql_extractor = SqlExtractor(sql)

  print(sql_extractor.sql)
  print(sql_extractor.tables)

输出结果:

{'mysql8.dataview_tprc.tprc_task', 'hive.bdc_dwd.dw_mk_order', 'mysql4.dataview_fenxiao.fx_order_task', 'mysql4.dataview_fenxiao.fx_order', 'hive.bdc_dwd.dw_mk_order_business', 'mysql7.dataview_trade.mk_order_merchant', 'mysql4.dataview_scrm.sc_tprc_product_info', 'hive.bdc_dwd.dw_fn_deal_asyn_order', 'hive.bdc_dwd.dw_fact_task_ss_daily', 'mysql7.dataview_trade.ddc_product_info', 'hive.bdc_dwd.dw_mk_order_status'}

格式化结果:

SELECT b.product_name "产品",
    count(a.order_id) "订单量",
    b.selling_price_max "销售价",
    b.gross_profit_rate_max/100 "毛利率",
    CASE
      WHEN b.business_type =1 THEN '自营消化'
      WHEN b.business_type =2 THEN '服务商消化'
    END "消化模式" from
 (SELECT 'CRM签单' label,date(d.update_ymd) close_ymd,c.product_name,c.product_id, a.order_id,cast(a.recipient_amount AS DOUBLE) amt,d.cost
  FROM mysql4.dataview_fenxiao.fx_order a
  LEFT JOIN mysql4.dataview_fenxiao.fx_order_task b ON a.order_id = b.order_id
  LEFT JOIN mysql7.dataview_trade.ddc_product_info c ON cast(c.product_id AS varchar) = a.product_ids
  AND c.snapshot_version = 'SELLING'
  INNER JOIN
   (SELECT t1.par_order_id,max(t1.update_ymd) update_ymd, sum(CASE
                                  WHEN t4.product2_type = 1
                                     AND t5.shop_id IS NOT NULL THEN t5.price
                                  ELSE t1.order_hosted_price
                                END) cost
   FROM hive.bdc_dwd.dw_mk_order t1
   LEFT JOIN hive.bdc_dwd.dw_mk_order_status t2 ON t1.order_id = t2.order_id
   AND t2.acct_day = substring(cast(DATE_ADD('day',-1,CURRENT_DATE) AS varchar),9,2)
   LEFT JOIN mysql7.dataview_trade.mk_order_merchant t3 ON t1.order_id = t3.order_id
   LEFT JOIN mysql7.dataview_trade.ddc_product_info t4 ON t4.product_id = t3.MERCHANT_ID
   AND t4.snapshot_version = 'SELLING'
   LEFT JOIN mysql4.dataview_scrm.sc_tprc_product_info t5 ON t5.product_id = t4.product_id
   AND t5.shop_id = t1.seller_id
   WHERE t1.acct_day = substring(cast(DATE_ADD('day',-1,CURRENT_DATE) AS varchar),9,2)
    AND t2.valid_state IN (100,200)------有效订单

    AND t1.order_mode = 10 --------产品消耗订单

    AND t2.complete_state = 1 -----订单已经完成

   GROUP BY t1.par_order_id ) d ON d.par_order_id = b.task_order_id
  WHERE c.product_type = 0
   AND date(from_unixtime(a.last_recipient_time)) > date('2016-01-01')
   AND a.payee_type <> 1 -----------已收款

  UNION ALL SELECT '企业管家消耗' label,date(c.update_ymd) close_ymd,b.product_name,b.product_id, a.task_id,(CASE
                    WHEN a.yb_price = 0
                         AND b.product2_type = 1 THEN b.selling_price_min
                            ELSE a.yb_price
                       END) amt, (CASE
                     WHEN a.yb_price = 0
                       AND b.product2_type = 2 THEN 0
                         WHEN b.product2_type = 1
                           AND e.shop_id IS NOT NULL THEN e.price
                          ELSE c.order_hosted_price
                       END) cost
  FROM mysql8.dataview_tprc.tprc_task a
  LEFT JOIN mysql7.dataview_trade.ddc_product_info b ON a.product_id = b.product_id
  AND b.snapshot_version = 'SELLING'
  INNER JOIN hive.bdc_dwd.dw_mk_order c ON a.order_id = c.order_id
  AND c.acct_day = substring(cast(DATE_ADD('day',-1,CURRENT_DATE) AS varchar),9,2)
  LEFT JOIN hive.bdc_dwd.dw_mk_order_status d ON d.order_id = c.order_id
  AND d.acct_day = substring(cast(DATE_ADD('day',-1,CURRENT_DATE) AS varchar),9,2)
  LEFT JOIN mysql4.dataview_scrm.sc_tprc_product_info e ON e.product_id = b.product_id
  AND e.shop_id = c.seller_id
  WHERE d.valid_state IN (100,200)
   AND d.complete_state = 1
   AND c.order_mode = 10
  UNION ALL SELECT '交易管理系统' label,date(t6.close_ymd) close_ymd,t4.product_name,t4.product_id, t1.order_id,(t1.order_hosted_price-t1.order_refund_price) amt, (CASE
              WHEN t1.order_mode <> 11 THEN t7.user_amount
              WHEN t1.order_mode = 11
                AND t4.product2_type = 1
                AND t5.shop_id IS NOT NULL THEN t5.price
              ELSE t8.cost
            END) cost
  FROM hive.bdc_dwd.dw_mk_order t1
  LEFT JOIN hive.bdc_dwd.dw_mk_order_business t2 ON t1.order_id = t2.order_id
  AND t2.acct_day=substring(cast(DATE_ADD('day',-1,CURRENT_DATE) AS varchar),9,2)
  LEFT JOIN mysql7.dataview_trade.mk_order_merchant t3 ON t1.order_id = t3.order_id
  LEFT JOIN mysql7.dataview_trade.ddc_product_info t4 ON t4.product_id = t3.MERCHANT_ID
  AND t4.snapshot_version = 'SELLING'
  LEFT JOIN mysql4.dataview_scrm.sc_tprc_product_info t5 ON t5.product_id = t4.product_id
  AND t5.shop_id = t1.seller_id
  LEFT JOIN hive.bdc_dwd.dw_fact_task_ss_daily t6 ON t6.task_id = t2.task_id
  AND t6.acct_time=date_format(date_add('day',-1,CURRENT_DATE),'%Y-%m-%d')
  LEFT JOIN
   (SELECT a.task_id,sum(a.user_amount) user_amount
   FROM hive.bdc_dwd.dw_fn_deal_asyn_order a
   WHERE a.is_new=1
    AND a.service='Trade_Payment'
    AND a.state=1
    AND a.acct_day=substring(cast(DATE_ADD('day',-1,CURRENT_DATE) AS varchar),9,2)
   GROUP BY a.task_id)t7 ON t7.task_id = t2.task_id
  LEFT JOIN
   (SELECT t1.par_order_id,sum(t1.order_hosted_price - t1.order_refund_price) cost
   FROM hive.bdc_dwd.dw_mk_order t1
   WHERE t1.acct_day = substring(cast(DATE_ADD('day',-1,CURRENT_DATE) AS varchar),9,2)
    AND t1.order_type = 1
    AND t1.order_stype = 4
    AND t1.order_mode = 12
   GROUP BY t1.par_order_id) t8 ON t1.order_id = t8.par_order_id
  WHERE t1.acct_day = substring(cast(DATE_ADD('day',-1,CURRENT_DATE) AS varchar),9,2)
   AND t1.order_type = 1
   AND t1.order_stype IN (4,5)
   AND t1.order_mode <> 12
   AND t4.product_id IS NOT NULL
   AND t1.order_hosted_price > 0
   AND t6.is_deal = 1
   AND t6.close_ymd >= '2018-12-31' )a
LEFT JOIN mysql7.dataview_trade.ddc_product_info b ON a.product_id = b.product_id
AND b.snapshot_version = 'SELLING'
WHERE b.product2_type = 1 -------标品
AND close_ymd BETWEEN DATE_ADD('day',-7,CURRENT_DATE) AND DATE_ADD('day',-1,CURRENT_DATE)
GROUP BY b.product_name,
     b.selling_price_max,
     b.gross_profit_rate_max/100,
     b.actrul_supply_num,
     CASE
       WHEN b.business_type =1 THEN '自营消化'
       WHEN b.business_type =2 THEN '服务商消化'
     END
ORDER BY count(a.order_id) DESC
LIMIT 10

以上这篇python如何解析复杂sql,实现数据库和表的提取的实例剖析就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • Python如何读取MySQL数据库表数据

    本文实例为大家分享了Python读取MySQL数据库表数据的具体代码,供大家参考,具体内容如下 环境:Python 3.6 ,Window 64bit 目的:从MySQL数据库读取目标表数据,并处理 代码: # -*- coding: utf-8 -*- import pandas as pd import pymysql ## 加上字符集参数,防止中文乱码 dbconn=pymysql.connect( host="**********", database="kimbo&

  • Python从数据库读取大量数据批量写入文件的方法

    使用机器学习训练数据时,如果数据量较大可能我们不能够一次性将数据加载进内存,这时我们需要将数据进行预处理,分批次加载进内存. 下面是代码作用是将数据从数据库读取出来分批次写入txt文本文件,方便我们做数据的预处理和训练机器学习模型. #%% import pymssql as MySQLdb #这里是python3 如果你是python2.x的话,import MySQLdb #数据库连接属性 hst = '188.10.34.18' usr = 'sa' passwd = 'p@ssw0rd'

  • Python操作MySQL数据库9个实用实例

    在Windows平台上安装mysql模块用于Python开发 用python连接mysql的时候,需要用的安装版本,源码版本容易有错误提示.下边是打包了32与64版本. MySQL-python-1.2.3.win32-py2.7.exe MySQL-python-1.2.3.win-amd64-py2.7.exe 实例 1.取得 MYSQL 的版本 # -*- coding: UTF-8 -*- #安装 MYSQL DB for python import MySQLdb as mdb con

  • Python进行数据提取的方法总结

    准备工作 首先是准备工作,导入需要使用的库,读取并创建数据表取名为loandata. import numpy as np import pandas as pd loandata=pd.DataFrame(pd.read_excel('loan_data.xlsx')) 设置索引字段 在开始提取数据前,先将member_id列设置为索引字段.然后开始提取数据. Loandata = loandata.set_index('member_id') 按行提取信息 第一步是按行提取数据,例如提取某个

  • python如何解析复杂sql,实现数据库和表的提取的实例剖析

    需求: 公司的数据分析师,提交一个sql, 一般都三四百行.由于数据安全的需要,不能开放所有的数据库和数据表给数据分析师查询,所以需要解析sql中的数据库和表,与权限管理系统中记录的数据库和表权限信息比对,实现非法查询的拦截. 解决办法: 在解决这个问题前,现在github找了一下轮子,发现python下面除了sql parse没什么好的解析数据库和表的轮轮.到是在java里面找到presto-parser解析的比较准.于是自己结合sql parse源码写了个类,供大家参考,测试了一下,检测还是

  • PHP操作SQL Server数据库实现表的改查与统计

    目录 今天上午做表格数据的检查与修改,涉及到PHP对MS SQL Server数据表的查询.统计与修改. 平时我也不记编码和命令,到了用的时候都再查手册,这往往很耗费时间,现在写博客有个好处就是把有用的代码记录下来,下次就直接复制,修改一下就直接用了. 这样可以省去很多时间. 下面是检索三个表来更新数据,即主表有信息不全,检索两个其他的信息表进行补录. <?php require 'pspLinkConfig.php';//加载数据库 $sql = "select C24 from cwk

  • SQL Server 2012 多表连接查询功能实例代码

    废话不多说了,直接给大家贴代码了,具体代码如下所示: -- 交叉连接产生笛卡尔值 (X*Y) SELECT * FROM Student cross Join dbo.ClassInfo --另外一种写法 SELECT * FROM Student , ClassInfo -- 内连接 (Inner 可以省略) SELECT * FROM Student JOIN dbo.ClassInfo ON dbo.Student.Class = dbo.ClassInfo.ID; -- Inner Jo

  • SQL Server数据库删除数据集中重复数据实例讲解

    SQL Server数据库操作中,有时对于表中的结果集,满足一定规则我们则认为是重复数据,而这些重复数据需要删除.如何删除呢?本文我们通过一个例子来加以说明. 例子如下: 如下只要companyName,invoiceNumber,customerNumber三者都相同,我们则认为是重复数据,下面的例子演示了如何删除. declare @InvoiceListMaster table ( ID int identity primary key , companyName Nchar(20), i

  • SQL Server数据库创建表及其约束条件的操作方法

    目录 1.创建数据库: 2.创建表: 3.约束: 3.1.Not  Null 3.2.UNIQUE 3.3.PRIMARY KEY 3.4.FOREIGN KEY 3.5.check 3.6.DEFAULT 撤销 DEFAULT 约束: 1.创建数据库: CREATE DATABASE my_db; 2.创建表: CREATE TABLE Persons ( Id_P int, LastName varchar(255), FirstName varchar(255), Address varc

  • sql lite 数据库之间表复制的方法

    现在的思路:打开一个目的库,打开一个源库,因为目的库中的某些表内容和源库的表内容不一致,所以需要把源库中的某些表导入到目的库中,步骤如下: 1.通过sql lite打开目的库,然后选中目的库右键,弹出有个date transfer wizard 的菜单,单击后,打开此窗体,里面有二个选项,其中export为导出,import为导入,此时选中import选项. 2.选中import选项后,对应下面资源有三个选项.(1)annother sqllite database  (2)sql script

  • Android Room数据库多表查询的使用实例

    Android-Room数据库(介绍) 前言 在SQLite数据库中,我们可以指定对象之间的关系,因此我们可以将一个或多个对象与一个或多个其他对象绑定.这就是所谓的一对多和多对多的关系. 既然要多表查询,所以表之间就得有关联.这时候我们就得使用新的注解符@ForeignKey 接下来的内容,就需要上节的内容了 @Entity public class Company { @PrimaryKey(autoGenerate = true) private int id; private String

  • SQL Server数据库中批量导入数据的四种方法总结

    在软件项目实施的时候,数据导入一直是项目人员比较头疼的问题.其实,在SQL Server中集成了很多成批导入数据的方法.有些项目实施顾问头疼的问题,在我们数据库管理员眼中,是小菜一碟.现在的重点就是,如何让用户了解这些方法,让数据导入变得轻松一些. 第一:使用Select Into语句 若企业数据库都采用的是SQL Server数据库的话,则可以利用Select Into语句来实现数据的导入.Select Into语句,他的作用就是把数据从另外一个数据库中查询出来,然后加入到某个用户指定的表中.

  • Python 解析pymysql模块操作数据库的方法

    pymysql 是 python 用来操作MySQL的第三方库,下面具体介绍和使用该库的基本方法. 1.建立数据库连接 通过 connect 函数中 parameter 参数 建立连接,连接成功返回Connection对象 import pymysql #建立数据库连接 connection = pymysql.connect(host = 'localhost', user = 'root', password = '123456', database = 'mydb', charset =

  • Python sqlparse解析SQL表血缘追踪实现

    目录 引言 一.主线任务 1.数据治理 2.血缘追踪 3.SQL表血缘 二.实现过程 1.目标效果 2.代码实现 1.功能函数识别 2.SQL标准格式 3.解析AST树 4.最终效果: 引言 SQLparse的开源库解析中就说过自己在寻找在python编程内可行的SQL血缘解析,JAVA去解析Hive的源码实践的话我还是打算放到后期来做,先把Python能够实现的先实现完.主要是HiveSQL的底层就是JAVA代码,怎么改写还是绕不开JAVA的.不过上篇系列我有提到过sqlparse,其实这个库

随机推荐