Spark网站日志过滤分析实例讲解

目录
  • 日志过滤
  • 日志分析

日志过滤

对于一个网站日志,首先要对它进行过滤,删除一些不必要的信息,我们通过scala语言来实现,清洗代码如下,代码要通过别的软件打包为jar包,此次实验所用需要用到的代码都被打好jar包,放到了/root/jar-files文件夹下:

package com.imooc.log
import com.imooc.log.SparkStatFormatJob.SetLogger
import com.imooc.log.util.AccessConvertUtil
import org.apache.spark.sql.{SaveMode, SparkSession}
/*
数据清洗部分
*/
object SparkStatCleanJob {
  def main(args: Array[String]): Unit = {
    SetLogger
    val spark = SparkSession.builder()
      .master("local[2]")
      .appName("SparkStatCleanJob").getOrCreate()
    val accessRDD = spark.sparkContext.textFile("/root/resources/access.log")
    accessRDD.take(4).foreach(println)
    val accessDF = spark.createDataFrame(accessRDD.map(x => AccessConvertUtil.parseLog(x)),AccessConvertUtil.struct)
    accessDF.printSchema()
    //-----------------数据清洗存储到目标地址------------------------
    // coalesce(1)输出指定分区数的小文件
    accessDF.coalesce(1).write.format("parquet").partitionBy("day").mode(SaveMode.Overwrite).save("/root/clean")//mode(SaveMode.Overwrite)覆盖已经存在的文件  存储为parquet格式,按day分区
      //存储为parquet格式,按day分区
    /**
      * 调优点:
      *     1) 控制文件输出的大小: coalesce
      *     2) 分区字段的数据类型调整:spark.sql.sources.partitionColumnTypeInference.enabled
      *     3) 批量插入数据库数据,提交使用batch操作
      */
    spark.stop()
  }
}

过滤好的数据将被存放在/root/clean文件夹中,这部分已被执行好,后面直接使用就可以,其中代码开始的SetLogger功能在自定义类com.imooc.log.SparkStatFormatJob中,它关闭了大部分log日志输出,这样可以使界面变得简洁,代码如下:

def SetLogger() = {
    Logger.getLogger("org").setLevel(Level.OFF)
    Logger.getLogger("com").setLevel(Level.OFF)
    System.setProperty("spark.ui.showConsoleProgress", "false")
    Logger.getRootLogger().setLevel(Level.OFF);
  }

过滤中的AccessConvertUtil类内容如下所示:

object AccessConvertUtil {
  //定义的输出字段
  val struct = StructType(            //过滤日志结构
    Array(
      StructField("url", StringType), //课程URL
      StructField("cmsType", StringType), //课程类型:video / article
      StructField("cmsId", LongType), //课程编号
      StructField("traffic", LongType), //耗费流量
      StructField("ip", StringType), //ip信息
      StructField("city", StringType), //所在城市
      StructField("time", StringType), //访问时间
      StructField("day", StringType) //分区字段,天
    )
  )
  /**
    * 根据输入的每一行信息转换成输出的样式
    * 日志样例:2017-05-11 14:09:14     http://www.imooc.com/video/4500    304    218.75.35.226
    */
  def parseLog(log: String) = {
    try {
      val splits = log.split("\t")
      val url = splits(1)
      //http://www.imooc.com/video/4500
      val traffic = splits(2).toLong
      val ip = splits(3)
      val domain = "http://www.imooc.com/"
      //主域名
      val cms = url.substring(url.indexOf(domain) + domain.length)    //建立一个url的子字符串,它将从domain出现时的位置加domain的长度的位置开始计起
      val cmsTypeId = cms.split("/")
      var cmsType = ""
      var cmsId = 0L
      if (cmsTypeId.length > 1) {
        cmsType = cmsTypeId(0)
        cmsId = cmsTypeId(1).toLong
      }      //以"/"分隔开后,就相当于分开了课程格式和id,以http://www.imooc.com/video/4500为例,此时cmsType=video,cmsId=4500
      val city = IpUtils.getCity(ip)         //从ip表中可以知道ip对应哪个城市
      val time = splits(0)
      //2017-05-11 14:09:14
      val day = time.split(" ")(0).replace("-", "")    //day=20170511
      //Row中的字段要和Struct中的字段对应
      Row(url, cmsType, cmsId, traffic, ip, city, time, day)
    } catch {
      case e: Exception => Row(0)
    }
  }
  def main(args: Array[String]): Unit = {
      //示例程序:
    val url = "http://www.imooc.com/video/4500"
    val domain = "http://www.imooc.com/" //主域名
    val index_0 = url.indexOf(domain)
    val index_1 = index_0 + domain.length
    val cms = url.substring(index_1)
    val cmsTypeId = cms.split("/")
    var cmsType = ""
    var cmsId = 0L
    if (cmsTypeId.length > 1) {
      cmsType = cmsTypeId(0)
      cmsId = cmsTypeId(1).toLong
    }
    println(cmsType + "   " + cmsId)
    val time = "2017-05-11 14:09:14"
    val day = time.split(" ")(0).replace("-", "")
    println(day)
  }
}

执行完毕后clean文件夹下内容如图1所示:

日志分析

现在我们已经拥有了过滤好的日志文件,可以开始编写分析代码,例如实现一个按地市统计主站最受欢迎的TopN课程

package com.imooc.log
import com.imooc.log.SparkStatFormatJob.SetLogger
import com.imooc.log.dao.StatDAO
import com.imooc.log.entity.{DayCityVideoAccessStat, DayVideoAccessStat, DayVideoTrafficsStat}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.collection.mutable.ListBuffer
object TopNStatJob2 {
  def main(args: Array[String]): Unit = {
    SetLogger
    val spark = SparkSession.builder()
      .config("spark.sql.sources.partitionColumnTypeInference.enabled", "false") //分区字段的数据类型调整【禁用】
      .master("local[2]")
      .config("spark.sql.parquet.compression.codec","gzip")   //修改parquet压缩格式
      .appName("SparkStatCleanJob").getOrCreate()
    //读取清洗过后的数据
    val cleanDF = spark.read.format("parquet").load("/root/clean")
    //执行业务前先清空当天表中的数据
    val day = "20170511"
    import spark.implicits._
    val commonDF = cleanDF.filter($"day" === day && $"cmsType" === "video")
    commonDF.cache()
    StatDAO.delete(day)
    cityAccessTopSata(spark, commonDF)     //按地市统计主站最受欢迎的TopN课程功能
    commonDF.unpersist(true)     //RDD去持久化,优化内存空间
    spark.stop()
  }
/*
 * 按地市统计主站最受欢迎的TopN课程
*/
def cityAccessTopSata(spark: SparkSession, commonDF: DataFrame): Unit = {
    //------------------使用DataFrame API完成统计操作--------------------------------------------
    import spark.implicits._
    val cityAccessTopNDF = commonDF
      .groupBy("day", "city", "cmsId").agg(count("cmsId").as("times")).orderBy($"times".desc)     //聚合
        cityAccessTopNDF.printSchema()
        cityAccessTopNDF.show(false)
     //-----------Window函数在Spark SQL中的使用--------------------
    val cityTop3DF = cityAccessTopNDF.select(       //Top3中涉及到的列
      cityAccessTopNDF("day"),
      cityAccessTopNDF("city"),
      cityAccessTopNDF("cmsId"),
      cityAccessTopNDF("times"),
      row_number().over(Window.partitionBy(cityAccessTopNDF("city"))
        .orderBy(cityAccessTopNDF("times").desc)).as("times_rank")
    ).filter("times_rank <= 3").orderBy($"city".desc, $"times_rank".asc)         //以city为一个partition,聚合times为times_rank,过滤出前三,降序聚合city,升序聚合times_rank
    cityTop3DF.show(false) //展示每个地市的Top3
     //-------------------将统计结果写入数据库-------------------
    try {
      cityTop3DF.foreachPartition(partitionOfRecords => {
        val list = new ListBuffer[DayCityVideoAccessStat]
        partitionOfRecords.foreach(info => {
          val day = info.getAs[String]("day")
          val cmsId = info.getAs[Long]("cmsId")
          val city = info.getAs[String]("city")
          val times = info.getAs[Long]("times")
          val timesRank = info.getAs[Int]("times_rank")
          list.append(DayCityVideoAccessStat(day, cmsId, city, times, timesRank))
        })
        StatDAO.insertDayCityVideoAccessTopN(list)
      })
    } catch {
      case e: Exception => e.printStackTrace()
    }
    }

其中保存统计时用到了StatDAO类的insertDayCityVideoAccessTopN()方法,这部分的说明如下:

def insertDayVideoTrafficsTopN(list: ListBuffer[DayVideoTrafficsStat]): Unit = {
    var connection: Connection = null
    var pstmt: PreparedStatement = null
    try {
      connection = MySQLUtils.getConnection()      //JDBC连接MySQL
      connection.setAutoCommit(false) //设置手动提交
        //向day_video_traffics_topn_stat表中插入数据
      val sql = "insert into day_video_traffics_topn_stat(day,cms_id,traffics) values(?,?,?)"
      pstmt = connection.prepareStatement(sql)
      for (ele <- list) {
        pstmt.setString(1, ele.day)
        pstmt.setLong(2, ele.cmsId)
        pstmt.setLong(3, ele.traffics)
        pstmt.addBatch() //优化点:批量插入数据库数据,提交使用batch操作
      }
      pstmt.executeBatch() //执行批量处理
      connection.commit() //手工提交
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      MySQLUtils.release(connection, pstmt)          //释放连接
    }
  }

JDBC连接MySQL和释放连接用到了MySQLUtils中的方法

此外我们还需要在MySQL中插入表,用来写入统计数据,MySQL表已经设置好。

下面将程序和所有依赖打包,用spark-submit提交:

./spark-submit --class com.imooc.log.TopNStatJob2 --master spark://localhost:9000 /root/jar-files/sql-1.0-jar-with-dependencies.jar

执行结果:

Schema信息

TopN课程信息

各地区Top3课程信息

MySQL表中数据:

到此这篇关于Spark网站日志过滤分析实例讲解的文章就介绍到这了,更多相关Spark日志分析内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 详解如何使用Spark和Scala分析Apache访问日志

    安装 首先需要安装好Java和Scala,然后下载Spark安装,确保PATH 和JAVA_HOME 已经设置,然后需要使用Scala的SBT构建Spark如下: $ sbt/sbt assembly 构建时间比较长.构建完成后,通过运行下面命令确证安装成功: $ ./bin/spark-shell scala> val textFile = sc.textFile("README.md") // 创建一个指向 README.md 引用 scala> textFile.co

  • Spark网站日志过滤分析实例讲解

    目录 日志过滤 日志分析 日志过滤 对于一个网站日志,首先要对它进行过滤,删除一些不必要的信息,我们通过scala语言来实现,清洗代码如下,代码要通过别的软件打包为jar包,此次实验所用需要用到的代码都被打好jar包,放到了/root/jar-files文件夹下: package com.imooc.log import com.imooc.log.SparkStatFormatJob.SetLogger import com.imooc.log.util.AccessConvertUtil i

  • log4j2日志异步打印(实例讲解)

    log4j2支持日志的异步打印,日志异步输出的好处在于,使用单独的进程来执行日志打印的功能,可以提高日志执行效率,减少日志功能对正常业务的影响. 异步日志在程序的classpath需要加载disruptor-3.0.0.jar或者更高的版本. Asynchronous Loggers是一个新增特性在Log4j 2 ,可以实现完全异步也可以和同步混合使用,还可以只异步化Appender,以提升系统性能,官方数据显示混合没有完全异步化效果好. 1,完全异步模式: 这种异步日志方式,不需要修改原来的配

  • spring boot自定义log4j2日志文件的实例讲解

    背景:因为从 spring boot 1.4开始的版本就要用log4j2 了,支持的格式有json和xml两种格式,此次实践主要使用的是xml的格式定义日志说明. spring boot 1.5.8.RELEASE 引入log4j2的开发步骤如下: 1.首先把spring-boot-starter-web以及spring-boot-starter包下面的spring-boot-starter-logging排除,然后引入spring-boot-starter-log4j2包. <dependen

  • spring AOP自定义注解方式实现日志管理的实例讲解

    今天继续实现AOP,到这里我个人认为是最灵活,可扩展的方式了,就拿日志管理来说,用Spring AOP 自定义注解形式实现日志管理.废话不多说,直接开始!!! 关于配置我还是的再说一遍. 在applicationContext-mvc.xml中要添加的 <mvc:annotation-driven /> <!-- 激活组件扫描功能,在包com.gcx及其子包下面自动扫描通过注解配置的组件 --> <context:component-scan base-package=&qu

  • php中使用array_filter()函数过滤数组实例讲解

    在数组中元素的过滤上,有一种方法比较特殊,结合了回调函数的使用,通过键值来与函数进行对应.相信说到这里很多人对于这种函数方法已经很好奇了,它就是array_filter() 函数的使用.接下来我们对该函数的定义.语法.参数.返回值.实例进行全面的介绍,具体过滤方法展现如下. 1.定义 array_filter用回调函数处理数组中的各个元素. 重点在于过滤(而不是新增)某个元素,当你处理到一个元素时,如果返回了false,那么这个元素将会被过滤掉.PS:保持了原来的索引. 2.语法 array a

  • python爬取安居客二手房网站数据(实例讲解)

    是小打小闹 哈哈,现在开始正式进行爬虫书写首先,需要分析一下要爬取的网站的结构:作为一名河南的学生,那就看看郑州的二手房信息吧! 在上面这个页面中,我们可以看到一条条的房源信息,从中我们发现了什么,发现了连郑州的二手房都是这么的贵,作为即将毕业的学生狗惹不起啊惹不起 还是正文吧!!!由上可以看到网页一条条的房源信息,点击进去后就会发现: 房源的详细信息.OK!那么我们要干嘛呢,就是把郑州这个地区的二手房房源信息都能拿到手,可以保存到数据库中,用来干嘛呢,作为一个地理人,还是有点用处的,这次就不说

  • spark rdd转dataframe 写入mysql的实例讲解

    dataframe是在spark1.3.0中推出的新的api,这让spark具备了处理大规模结构化数据的能力,在比原有的RDD转化方式易用的前提下,据说计算性能更还快了两倍.spark在离线批处理或者实时计算中都可以将rdd转成dataframe进而通过简单的sql命令对数据进行操作,对于熟悉sql的人来说在转换和过滤过程很方便,甚至可以有更高层次的应用,比如在实时这一块,传入kafka的topic名称和sql语句,后台读取自己配置好的内容字段反射成一个class并利用出入的sql对实时数据进行

  • MYSQL慢查询和日志实例讲解

    一.简介 开启慢查询日志,可以让MySQL记录下查询超过指定时间的语句,通过定位分析性能的瓶颈,才能更好的优化数据库系统的性能. 二.参数说明 slow_query_log 慢查询开启状态 slow_query_log_file 慢查询日志存放的位置(这个目录需要MySQL的运行帐号的可写权限,一般设置为MySQL的数据存放目录) long_query_time 查询超过多少秒才记录 三.设置步骤 1.查看慢查询相关参数 mysql> show variables like 'slow_quer

  • python爬虫实现爬取同一个网站的多页数据的实例讲解

    对于一个网站的图片.文字音视频等,如果我们一个个的下载,不仅浪费时间,而且很容易出错.Python爬虫帮助我们获取需要的数据,这个数据是可以快速批量的获取.本文小编带领大家通过python爬虫获取获取总页数并更改url的方法,实现爬取同一个网站的多页数据. 一.爬虫的目的 从网上获取对你有需要的数据 二.爬虫过程 1.获取url(网址). 2.发出请求,获得响应. 3.提取数据. 4.保存数据. 三.爬虫功能 可以快速批量的获取想要的数据,不用手动的一个个下载(图片.文字音视频等) 四.使用py

  • Java Vector和ArrayList的异同分析及实例讲解

    在线程中有两种常用的方法,能够通过数组实现相应的功能,但除此之外在区别上也是很明显的.本篇就其中的代表方法ArrayList和Vector进行比较分析,一个是非线程安全,另一个是线程安全.在进行相同和不同点的分析之后,带来二者的实例代码对比,帮助大家体会它们的异同. 1.相同点 (1)都是有序集合. (2)数据不允许重复. (3)都实现了list接口. (4)都是通过数组实现的. (5)数组进行复制.移动.代价比较高,因此,适合随机查询和遍历,不适合插入和删除. 2.不同点 (1)ArrayLi

随机推荐