DataFrame:通过SparkSql将scala类转为DataFrame的方法

如下所示:

import java.text.DecimalFormat
import com.alibaba.fastjson.JSON
import com.donews.data.AppConfig
import com.typesafe.config.ConfigFactory
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{Row, SaveMode, DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory

/**
 * Created by silentwolf on 2016/6/3.
 */

case class UserTag(SUUID: String,
     MAN: Float,
     WOMAN: Float,
     AGE10_19: Float,
     AGE20_29: Float,
     AGE30_39: Float,
     AGE40_49: Float,
     AGE50_59: Float,
     GAME: Float,
     MOVIE: Float,
     MUSIC: Float,
     ART: Float,
     POLITICS_NEWS: Float,
     FINANCIAL: Float,
     EDUCATION_TRAINING: Float,
     HEALTH_CARE: Float,
     TRAVEL: Float,
     AUTOMOBILE: Float,
     HOUSE_PROPERTY: Float,
     CLOTHING_ACCESSORIES: Float,
     BEAUTY: Float,
     IT: Float,
     BABY_PRODUCT: Float,
     FOOD_SERVICE: Float,
     HOME_FURNISHING: Float,
     SPORTS: Float,
     OUTDOOR_ACTIVITIES: Float,
     MEDICINE: Float
     )

object UserTagTable {

 val LOG = LoggerFactory.getLogger(UserOverviewFirst.getClass)

 val REP_HOME = s"${AppConfig.HDFS_MASTER}/${AppConfig.HDFS_REP}"

 def main(args: Array[String]) {

 var startTime = System.currentTimeMillis()

 val conf: com.typesafe.config.Config = ConfigFactory.load()

 val sc = new SparkContext()

 val sqlContext = new SQLContext(sc)

 var df1: DataFrame = null

 if (args.length == 0) {
  println("请输入: appkey , StartTime : 2016-04-10 ,StartEnd :2016-04-11")
 }
 else {

  var appkey = args(0)

  var lastdate = args(1)

  df1 = loadDataFrame(sqlContext, appkey, "2016-04-10", lastdate)

  df1.registerTempTable("suuidTable")

  sqlContext.udf.register("taginfo", (a: String) => userTagInfo(a))
  sqlContext.udf.register("intToString", (b: Long) => intToString(b))
  import sqlContext.implicits._

  //***重点***:将临时表中的suuid和自定函数中Json数据,放入UserTag中。
 sqlContext.sql(" select distinct(suuid) AS suuid,taginfo(suuid) from suuidTable group by suuid").map { case Row(suuid: String, taginfo: String) =>
  val taginfoObj = JSON.parseObject(taginfo)
  UserTag(suuid.toString,
   taginfoObj.getFloat("man"),
   taginfoObj.getFloat("woman"),
   taginfoObj.getFloat("age10_19"),
   taginfoObj.getFloat("age20_29"),
   taginfoObj.getFloat("age30_39"),
   taginfoObj.getFloat("age40_49"),
   taginfoObj.getFloat("age50_59"),
   taginfoObj.getFloat("game"),
   taginfoObj.getFloat("movie"),
   taginfoObj.getFloat("music"),
   taginfoObj.getFloat("art"),
   taginfoObj.getFloat("politics_news"),
   taginfoObj.getFloat("financial"),
   taginfoObj.getFloat("education_training"),
   taginfoObj.getFloat("health_care"),
   taginfoObj.getFloat("travel"),
   taginfoObj.getFloat("automobile"),
   taginfoObj.getFloat("house_property"),
   taginfoObj.getFloat("clothing_accessories"),
   taginfoObj.getFloat("beauty"),
   taginfoObj.getFloat("IT"),
   taginfoObj.getFloat("baby_Product"),
   taginfoObj.getFloat("food_service"),
   taginfoObj.getFloat("home_furnishing"),
   taginfoObj.getFloat("sports"),
   taginfoObj.getFloat("outdoor_activities"),
   taginfoObj.getFloat("medicine")
  )}.toDF().registerTempTable("resultTable")

  val resultDF = sqlContext.sql(s"select '$appkey' AS APPKEY, '$lastdate' AS DATE,SUUID ,MAN,WOMAN,AGE10_19,AGE20_29,AGE30_39 ," +
  "AGE40_49 ,AGE50_59,GAME,MOVIE,MUSIC,ART,POLITICS_NEWS,FINANCIAL,EDUCATION_TRAINING,HEALTH_CARE,TRAVEL,AUTOMOBILE," +
  "HOUSE_PROPERTY,CLOTHING_ACCESSORIES,BEAUTY,IT,BABY_PRODUCT ,FOOD_SERVICE ,HOME_FURNISHING ,SPORTS ,OUTDOOR_ACTIVITIES ," +
  "MEDICINE from resultTable WHERE SUUID IS NOT NULL")
  resultDF.write.mode(SaveMode.Overwrite).options(
  Map("table" -> "USER_TAGS", "zkUrl" -> conf.getString("Hbase.url"))
  ).format("org.apache.phoenix.spark").save()

 }
 }

 def intToString(suuid: Long): String = {
 suuid.toString()
 }

 def userTagInfo(num1: String): String = {

 var de = new DecimalFormat("0.00")
 var mannum = de.format(math.random).toFloat
 var man = mannum
 var woman = de.format(1 - mannum).toFloat

 var age10_19num = de.format(math.random * 0.2).toFloat
 var age20_29num = de.format(math.random * 0.2).toFloat
 var age30_39num = de.format(math.random * 0.2).toFloat
 var age40_49num = de.format(math.random * 0.2).toFloat

 var age10_19 = age10_19num
 var age20_29 = age20_29num
 var age30_39 = age30_39num
 var age40_49 = age40_49num
 var age50_59 = de.format(1 - age10_19num - age20_29num - age30_39num - age40_49num).toFloat

 var game = de.format(math.random * 1).toFloat
 var movie = de.format(math.random * 1).toFloat
 var music = de.format(math.random * 1).toFloat
 var art = de.format(math.random * 1).toFloat
 var politics_news = de.format(math.random * 1).toFloat

 var financial = de.format(math.random * 1).toFloat
 var education_training = de.format(math.random * 1).toFloat
 var health_care = de.format(math.random * 1).toFloat
 var travel = de.format(math.random * 1).toFloat
 var automobile = de.format(math.random * 1).toFloat

 var house_property = de.format(math.random * 1).toFloat
 var clothing_accessories = de.format(math.random * 1).toFloat
 var beauty = de.format(math.random * 1).toFloat
 var IT = de.format(math.random * 1).toFloat
 var baby_Product = de.format(math.random * 1).toFloat

 var food_service = de.format(math.random * 1).toFloat
 var home_furnishing = de.format(math.random * 1).toFloat
 var sports = de.format(math.random * 1).toFloat
 var outdoor_activities = de.format(math.random * 1).toFloat
 var medicine = de.format(math.random * 1).toFloat

 "{" + "\"man\"" + ":" + man + "," + "\"woman\"" + ":" + woman + "," + "\"age10_19\"" + ":" + age10_19 + "," + "\"age20_29\"" + ":" + age20_29 + "," +
  "\"age30_39\"" + ":" + age30_39 + "," + "\"age40_49\"" + ":" + age40_49 + "," + "\"age50_59\"" + ":" + age50_59 + "," + "\"game\"" + ":" + game + "," +
  "\"movie\"" + ":" + movie + "," + "\"music\"" + ":" + music + "," + "\"art\"" + ":" + art + "," + "\"politics_news\"" + ":" + politics_news + "," +
  "\"financial\"" + ":" + financial + "," + "\"education_training\"" + ":" + education_training + "," + "\"health_care\"" + ":" + health_care + "," +
  "\"travel\"" + ":" + travel + "," + "\"automobile\"" + ":" + automobile + "," + "\"house_property\"" + ":" + house_property + "," + "\"clothing_accessories\"" + ":" + clothing_accessories + "," +
  "\"beauty\"" + ":" + beauty + "," + "\"IT\"" + ":" + IT + "," + "\"baby_Product\"" + ":" + baby_Product + "," + "\"food_service\"" + ":" + food_service + "," +
  "\"home_furnishing\"" + ":" + home_furnishing + "," + "\"sports\"" + ":" + sports + "," + "\"outdoor_activities\"" + ":" + outdoor_activities + "," + "\"medicine\"" + ":" + medicine +
  "}";

 }

 def loadDataFrame(ctx: SQLContext, appkey: String, startDay: String, endDay: String): DataFrame = {
 val path = s"$REP_HOME/appstatistic"
 ctx.read.parquet(path)
  .filter(s"timestamp is not null and appkey='$appkey' and day>='$startDay' and day<='$endDay'")
 }

}

以上这篇DataFrame:通过SparkSql将scala类转为DataFrame的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • pyspark.sql.DataFrame与pandas.DataFrame之间的相互转换实例

    代码如下,步骤流程在代码注释中可见: # -*- coding: utf-8 -*- import pandas as pd from pyspark.sql import SparkSession from pyspark.sql import SQLContext from pyspark import SparkContext #初始化数据 #初始化pandas DataFrame df = pd.DataFrame([[1, 2, 3], [4, 5, 6]], index=['row1

  • 浅谈DataFrame和SparkSql取值误区

    1.DataFrame返回的不是对象. 2.DataFrame查出来的数据返回的是一个dataframe数据集. 3.DataFrame只有遇见Action的算子才能执行 4.SparkSql查出来的数据返回的是一个dataframe数据集. 原始数据 scala> val parquetDF = sqlContext.read.parquet("hdfs://hadoop14:9000/yuhui/parquet/part-r-00004.gz.parquet") df: or

  • pandas把dataframe转成Series,改变列中值的类型方法

    使用 pd.Series把dataframe转成Series ts = pd.Series(df['Value'].values, index=df['Date']) 使用astype改变列中的值的类型,注意前面要有np df['列名'] = df['列名'].astype(np.int64) 以上这篇pandas把dataframe转成Series,改变列中值的类型方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们. 您可能感兴趣的文章: python panda

  • python dataframe astype 字段类型转换方法

    使用astype实现dataframe字段类型转换 # -*- coding: UTF-8 -*- import pandas as pd df = pd.DataFrame([{'col1':'a', 'col2':'1'}, {'col1':'b', 'col2':'2'}]) print df.dtypes df['col2'] = df['col2'].astype('int') print '-----------' print df.dtypes df['col2'] = df['c

  • python读取文本中数据并转化为DataFrame的实例

    在技术问答中看到一个这样的问题,感觉相对比较常见,就单开一篇文章写下来. 从纯文本格式文件 "file_in"中读取数据,格式如下: 需要输出成"file_out",格式如下: 数据的原格式是"类别:内容",以空行"\n"为分条目,转换后变成一个条目一行,按照类别顺序依次写出内容. 建议读取后,使用pandas,把数据建立称DataFrame的表格.这样方便以后处理数据.但是原格式并不是通常的表格格式,所以要先做一些简单的处理

  • spark: RDD与DataFrame之间的相互转换方法

    DataFrame是一个组织成命名列的数据集.它在概念上等同于关系数据库中的表或R/Python中的数据框架,但其经过了优化.DataFrames可以从各种各样的源构建,例如:结构化数据文件,Hive中的表,外部数据库或现有RDD. DataFrame API 可以被Scala,Java,Python和R调用. 在Scala和Java中,DataFrame由Rows的数据集表示. 在Scala API中,DataFrame只是一个类型别名Dataset[Row].而在Java API中,用户需要

  • pandas 实现字典转换成DataFrame的方法

    把dictd = {'A':0}转换成DataFrame, 首先,DataFrame的语法格式应为: import pandas as pd df = pd.DataFrame({'A':[0]})#'A'是columns,对应的是list 输出: A 0 0 但是如果是: df = pd.DataFrame({'A':0})#直接输入dict 会报错 ValueError: If using all scalar values, you must pass an index 解决办法1: 指定

  • DataFrame:通过SparkSql将scala类转为DataFrame的方法

    如下所示: import java.text.DecimalFormat import com.alibaba.fastjson.JSON import com.donews.data.AppConfig import com.typesafe.config.ConfigFactory import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.{Row, SaveMode, Da

  • SparkSQL使用IDEA快速入门DataFrame与DataSet的完美教程

    目录 1.使用IDEA开发Spark SQL 1.1创建DataFrame/DataSet 1.1.1指定列名添加Schema 1.1.2StructType指定Schema 1.1.3反射推断Schema 1.使用IDEA开发Spark SQL 1.1创建DataFrame/DataSet 1.指定列名添加Schema 2.通过StrucType指定Schema 3.编写样例类,利用反射机制推断Schema 1.1.1指定列名添加Schema //导包 import org.apache.sp

  • DataFrame 将某列数据转为数组的方法

    如下所示: playerIds =salaries_2016['playerID'].tolist() data['列名'].tolist() 以上这篇DataFrame 将某列数据转为数组的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们. 您可能感兴趣的文章: python读取文本中数据并转化为DataFrame的实例 pandas修改DataFrame列名的方法 pandas系列之DataFrame 行列数据筛选实例 Python将DataFrame的某一列

  • pandas DataFrame数据转为list的方法

    首先使用np.array()函数把DataFrame转化为np.ndarray(),再利用tolist()函数把np.ndarray()转为list,示例代码如下: # -*- coding:utf-8-*- import numpy as np import pandas as pd data_x = pd.read_csv("E:/Tianchi/result/features.csv",usecols=[2,3,4])#pd.dataframe data_y = pd.read_

  • python pymysql链接数据库查询结果转为Dataframe实例

    我就废话不多说了,大家还是直接看代码吧! import pymysql import pandas as pd def con_sql(db,sql): # 创建连接 db = pymysql.connect(host='127.0.0.1', port=3308, user='name', passwd='password', db=db, charset='utf8') # 创建游标 cursor = db.cursor() cursor.execute(sql) result = curs

  • 深入学习java中的Groovy 和 Scala 类

    前言 Java 传承的是平台,而不是语言.有超过 200 种语言可以在 JVM 上运行,它们之中不可避免地会有一种语言最终将取代 Java 语言,成为编写 JVM 程序的最佳方式.本系列将探讨三种下一代 JVM 语言:Groovy.Scala 和 Clojure,比较并对比新的功能和范例,让 Java 开发人员对自己近期的未来发展有大体的认识. Java 语言的开发人员精通 C++ 和其他语言,包括多继承(multiple inheritance),使得类可以继承自任意数量的父类.多继承带来的一

  • Python Dataframe 指定多列去重、求差集的方法

    1)去重 指定多列去重,这是在dataframe没有独一无二的字段作为PK(主键)时,需要指定多个字段一起作为该行的PK,在这种情况下对整体数据进行去重. Attention:主要用到了drop_duplicates方法,并设置参数subset为多个字段名构成的数组. 具体代码如下: >>>import pandas as pd >>>data={'state':[1,1,2,2,1,2,2],'pop':['a','b','c','d','b','c','d']} &

  • python dataframe向下向上填充,fillna和ffill的方法

    首先新建一个dataframe: In[8]: df = pd.DataFrame({'name':list('ABCDA'),'house':[1,1,2,3,3],'date':['2010-01-01','2010-06-09','2011-12-03','2011-04-05','2012-03-23']}) In[9]: df Out[9]: date house name 0 2010-01-01 1 A 1 2010-06-09 1 B 2 2011-12-03 2 C 3 201

  • pandas.dataframe中根据条件获取元素所在的位置方法(索引)

    在dataframe中根据一定的条件,得到符合要求的某行元素所在的位置. 代码如下所示: df = pd.DataFrame({'BoolCol': [1, 2, 3, 3, 4],'attr': [22, 33, 22, 44, 66]}, index=[10,20,30,40,50]) print(df) a = df[(df.BoolCol==3)&(df.attr==22)].index.tolist() print(a) df如下所示,以上通过选取"BoolCol"取

随机推荐