Spark学习笔记Spark Streaming的使用

1. Spark Streaming

  • Spark Streaming是一个基于Spark Core之上的实时计算框架,可以从很多数据源消费数据并对数据进行处理
  • Spark Streaing中有一个最基本的抽象叫DStream(代理),本质上就是一系列连续的RDD,DStream其实就是对RDD的封装
  • DStream可以认为是一个RDD的工厂,该DStream里面生产都是相同业务逻辑的RDD,只不过是RDD里面要读取数据的不相同
  • 在一个批次的处理时间间隔里, DStream只产生一个RDD
  • DStream就相当于一个"模板", 我们可以根据这个"模板"来处理一段时间间隔之内产生的这个rdd,以此为依据来构建rdd的DAG

2. 当下比较流行的实时计算引擎

吞吐量 编程语言 处理速度 生态

Storm 较低 clojure 非常快(亚秒) 阿里(JStorm)

Flink 较高 scala 较快(亚秒) 国内使用较少

Spark Streaming 非常高 scala 快(毫秒) 完善的生态圈

3. Spark Streaming处理网络数据

//创建StreamingContext 至少要有两个线程 一个线程用于接收数据 一个线程用于处理数据
val conf = new SparkConf().setAppName("Ops1").setMaster("local[2]")
val ssc = new StreamingContext(conf, Milliseconds(3000))
val receiverDS: ReceiverInputDStream[String] = ssc.socketTextStream("uplooking01", 44444)
val pairRetDS: DStream[(String, Int)] = receiverDS.flatMap(_.split(",")).map((_, 1)).reduceByKey(_ + _)
pairRetDS.print()
//开启流计算
ssc.start()
//优雅的关闭
ssc.awaitTermination()

4. Spark Streaming接收数据的两种方式(Kafka)

Receiver

  • 偏移量是由zookeeper来维护的
  • 使用的是Kafka高级的API(消费者的API)
  • 编程简单
  • 效率低(为了保证数据的安全性,会开启WAL)
  • kafka0.10的版本中已经彻底弃用Receiver了
  • 生产环境一般不会使用这种方式

Direct

  • 偏移量是有我们来手动维护
  • 效率高(我们直接把spark streaming接入到kafka的分区中了)
  • 编程比较复杂
  • 生产环境一般使用这种方式

5. Spark Streaming整合Kafka

基于Receiver的方式整合kafka(生产环境不建议使用,在0.10中已经移除了)

//创建StreamingContext 至少要有两个线程 一个线程用于接收数据 一个线程用于处理数据
val conf = new SparkConf().setAppName("Ops1").setMaster("local[2]")
val ssc = new StreamingContext(conf, Milliseconds(3000))
val zkQuorum = "uplooking03:2181,uplooking04:2181,uplooking05:2181"
val groupId = "myid"
val topics = Map("hadoop" -> 3)
val receiverDS: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)
receiverDS.flatMap(_._2.split(" ")).map((_,1)).reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()

基于Direct的方式(生产环境使用)

//创建StreamingContext 至少要有两个线程 一个线程用于接收数据 一个线程用于处理数据
val conf = new SparkConf().setAppName("Ops1").setMaster("local[2]")
val ssc = new StreamingContext(conf, Milliseconds(3000))
val kafkaParams = Map("metadata.broker.list" -> "uplooking03:9092,uplooking04:9092,uplooking05:9092")
val topics = Set("hadoop")
val inputDS: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
inputDS.flatMap(_._2.split(" ")).map((_, 1)).reduceByKey(_ + _).print()
ssc.start()
ssc.awaitTermination()

6. 实时流计算的架构

1. 生成日志(模拟用户访问web应用的日志)

public class GenerateAccessLog {
  public static void main(String[] args) throws IOException, InterruptedException {
    //准备数据
    int[] ips = {123, 18, 123, 112, 181, 16, 172, 183, 190, 191, 196, 120};
    String[] requesTypes = {"GET", "POST"};
    String[] cursors = {"/vip/112", "/vip/113", "/vip/114", "/vip/115", "/vip/116", "/vip/117", "/vip/118", "/vip/119", "/vip/120", "/vip/121", "/free/210", "/free/211", "/free/212", "/free/213", "/free/214", "/company/312", "/company/313", "/company/314", "/company/315"};

    String[] courseNames = {"大数据", "python", "java", "c++", "c", "scala", "android", "spark", "hadoop", "redis"};
    String[] references = {"www.baidu.com/", "www.sougou.com/", "www.sou.com/", "www.google.com"};
    FileWriter fw = new FileWriter(args[0]);
    PrintWriter printWriter = new PrintWriter(fw);
    while (true) {
      //      Thread.sleep(1000);
      //产生字段
      String date = new Date().toLocaleString();
      String method = requesTypes[getRandomNum(0, requesTypes.length)];
      String url = "/cursor" + cursors[getRandomNum(0, cursors.length)];
      String HTTPVERSION = "HTTP/1.1";
      String ip = ips[getRandomNum(0, ips.length)] + "." + ips[getRandomNum(0, ips.length)] + "." + ips[getRandomNum(0, ips.length)] + "." + ips[getRandomNum(0, ips.length)];
      String reference = references[getRandomNum(0, references.length)];
      String rowLog = date + " " + method + " " + url + " " + HTTPVERSION + " " + ip + " " + reference;
      printWriter.println(rowLog);
      printWriter.flush();
    }
  }

  //[start,end)
  public static int getRandomNum(int start, int end) {
    int i = new Random().nextInt(end - start) + start;
    return i;
  }
}

2. flume使用avro采集web应用服务器的日志数据

采集命令执行的结果到avro中

# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'
f1.sources = r1
f1.channels = c1
f1.sinks = k1

#define sources
f1.sources.r1.type = exec
f1.sources.r1.command =tail -F /logs/access.log

#define channels
f1.channels.c1.type = memory
f1.channels.c1.capacity = 1000
f1.channels.c1.transactionCapacity = 100

#define sink 采集日志到uplooking03
f1.sinks.k1.type = avro
f1.sinks.k1.hostname = uplooking03
f1.sinks.k1.port = 44444

#bind sources and sink to channel
f1.sources.r1.channels = c1
f1.sinks.k1.channel = c1
从avro采集到控制台
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'
f2.sources = r2
f2.channels = c2
f2.sinks = k2

#define sources
f2.sources.r2.type = avro
f2.sources.r2.bind = uplooking03
f2.sources.r2.port = 44444

#define channels
f2.channels.c2.type = memory
f2.channels.c2.capacity = 1000
f2.channels.c2.transactionCapacity = 100

#define sink
f2.sinks.k2.type = logger

#bind sources and sink to channel
f2.sources.r2.channels = c2
f2.sinks.k2.channel = c2
从avro采集到kafka中
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'
f2.sources = r2
f2.channels = c2
f2.sinks = k2

#define sources
f2.sources.r2.type = avro
f2.sources.r2.bind = uplooking03
f2.sources.r2.port = 44444

#define channels
f2.channels.c2.type = memory
f2.channels.c2.capacity = 1000
f2.channels.c2.transactionCapacity = 100

#define sink
f2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
f2.sinks.k2.topic = hadoop
f2.sinks.k2.brokerList = uplooking03:9092,uplooking04:9092,uplooking05:9092
f2.sinks.k2.requiredAcks = 1

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Spark SQL数据加载和保存实例讲解

    一.前置知识详解 Spark SQL重要是操作DataFrame,DataFrame本身提供了save和load的操作, Load:可以创建DataFrame, Save:把DataFrame中的数据保存到文件或者说与具体的格式来指明我们要读取的文件的类型以及与具体的格式来指出我们要输出的文件是什么类型. 二.Spark SQL读写数据代码实战 import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD;

  • Spark自定义累加器的使用实例详解

    累加器(accumulator)是Spark中提供的一种分布式的变量机制,其原理类似于mapreduce,即分布式的改变,然后聚合这些改变.累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数. 累加器简单使用 Spark内置的提供了Long和Double类型的累加器.下面是一个简单的使用示例,在这个例子中我们在过滤掉RDD中奇数的同时进行计数,最后计算剩下整数的和. val sparkConf = new SparkConf().setAppName("Test").setM

  • spark rdd转dataframe 写入mysql的实例讲解

    dataframe是在spark1.3.0中推出的新的api,这让spark具备了处理大规模结构化数据的能力,在比原有的RDD转化方式易用的前提下,据说计算性能更还快了两倍.spark在离线批处理或者实时计算中都可以将rdd转成dataframe进而通过简单的sql命令对数据进行操作,对于熟悉sql的人来说在转换和过滤过程很方便,甚至可以有更高层次的应用,比如在实时这一块,传入kafka的topic名称和sql语句,后台读取自己配置好的内容字段反射成一个class并利用出入的sql对实时数据进行

  • Spark的广播变量和累加器使用方法代码示例

    一.广播变量和累加器 通常情况下,当向Spark操作(如map,reduce)传递一个函数时,它会在一个远程集群节点上执行,它会使用函数中所有变量的副本.这些变量被复制到所有的机器上,远程机器上并没有被更新的变量会向驱动程序回传.在任务之间使用通用的,支持读写的共享变量是低效的.尽管如此,Spark提供了两种有限类型的共享变量,广播变量和累加器. 1.1 广播变量: 广播变量允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量.广播变量可被用于有效地给每个节点一个大输入数据集的副

  • spark之Standalone模式部署配置详解

    spark运行模式 Spark 有很多种模式,最简单就是单机本地模式,还有单机伪分布式模式,复杂的则运行在集群中,目前能很好的运行在 Yarn和 Mesos 中,当然 Spark 还有自带的 Standalone 模式,对于大多数情况 Standalone 模式就足够了,如果企业已经有 Yarn 或者 Mesos 环境,也是很方便部署的. 1.local(本地模式):常用于本地开发测试,本地还分为local单线程和local-cluster多线程; 2.standalone(集群模式):典型的M

  • Spark整合Mongodb的方法

    Spark介绍 按照官方的定义,Spark 是一个通用,快速,适用于大规模数据的处理引擎. 通用性:我们可以使用Spark SQL来执行常规分析, Spark Streaming 来流数据处理, 以及用Mlib来执行机器学习等.Java,python,scala及R语言的支持也是其通用性的表现之一. 快速: 这个可能是Spark成功的最初原因之一,主要归功于其基于内存的运算方式.当需要处理的数据需要反复迭代时,Spark可以直接在内存中暂存数据,而无需像Map Reduce一样需要把数据写回磁盘

  • Spark学习笔记Spark Streaming的使用

    1. Spark Streaming Spark Streaming是一个基于Spark Core之上的实时计算框架,可以从很多数据源消费数据并对数据进行处理 Spark Streaing中有一个最基本的抽象叫DStream(代理),本质上就是一系列连续的RDD,DStream其实就是对RDD的封装 DStream可以认为是一个RDD的工厂,该DStream里面生产都是相同业务逻辑的RDD,只不过是RDD里面要读取数据的不相同 在一个批次的处理时间间隔里, DStream只产生一个RDD DSt

  • Spark学习笔记(一)Spark初识【特性、组成、应用】

    本文实例讲述了Spark基本特性.组成.应用.分享给大家供大家参考,具体如下: 一.官网介绍 1.什么是Spark 官网地址:http://spark.apache.org/ Apache Spark™是用于大规模数据处理的统一分析引擎. 从右侧最后一条新闻看,Spark也用于AI人工智能 spark是一个实现快速通用的集群计算平台.它是由加州大学伯克利分校AMP实验室 开发的通用内存并行计算框架,用来构建大型的.低延迟的数据分析应用程序.它扩展了广泛使用的MapReduce计算模型.高效的支撑

  • Spark学习笔记 (二)Spark2.3 HA集群的分布式安装图文详解

    本文实例讲述了Spark2.3 HA集群的分布式安装.分享给大家供大家参考,具体如下: 一.下载Spark安装包 1.从官网下载 http://spark.apache.org/downloads.html 2.从微软的镜像站下载 http://mirrors.hust.edu.cn/apache/ 3.从清华的镜像站下载 https://mirrors.tuna.tsinghua.edu.cn/apache/ 二.安装基础 1.Java8安装成功 2.zookeeper安装成功 3.hadoo

  • Spark学习笔记之Spark中的RDD的具体使用

    1. Spark中的RDD Resilient Distributed Datasets(弹性分布式数据集) Spark中的最基本的抽象 有了RDD的存在我们就可以像操作本地集合一样操作分布式的数据 包含所有元素的分区的集合 RDD包含了很多的分区 2. RDD中的弹性 RDD中的数据是可大可小的 RDD的数据默认情况下存放在内存中的,但是在内存资源不足时,Spark会自动将RDD数据写入磁盘 RDD有自动容错功能,当其中一个RDD中的分区的数据丢失,或者当前节点故障时,rdd会根据依赖关系重新

  • Spark学习笔记之Spark SQL的具体使用

    1. Spark SQL是什么? 处理结构化数据的一个spark的模块 它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用 2. Spark SQL的特点 多语言的接口支持(java python scala) 统一的数据访问 完全兼容hive 支持标准的连接 3. 为什么学习SparkSQL? 我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执

  • angularjs学习笔记之简单介绍

    一.angularjs简介 AngularJS 是一个为动态WEB应用设计的结构框架.它能让你使用HTML作为模板语言,通过扩展HTML的语法,让你能更清楚.简洁地构建你的应用组件.它的创新点在于,利用 数据绑定 和 依赖注入,它使你不用再写大量的代码了.这些全都是通过浏览器端的Javascript实现,这也使得它能够完美地和任何服务器端技术结合. 说了这么多,估计你啥都没有理解...对吗?别着急,我来说说他的几个特点吧:模块化,数据双向绑定,依赖注入,指令.下面我们就跟着这几个特点进行学习.

  • AngularJS学习笔记之表单验证功能实例详解

    本文实例讲述了AngularJS学习笔记之表单验证功能.分享给大家供大家参考,具体如下: 一.执行基本的表单验证 <!DOCTYPE html> <html ng-app='exampleApp'> <head> <meta charset="UTF-8"> <title>表单</title> <script src="../../js/angular.min.js" type="

  • 正则表达式学习笔记

    正则表达式学习笔记 正则表达式(regular expression)描述了一种字符串匹配的模式,可以用来检查一个串是否含  有某种子串.将匹配的子串做替换或者从某个串中取出符合某个条件的子串等. 列目录时, dir *.txt或ls *.txt中的*.txt就不是一个正则表达式,因为这里*与正则式的*  的含义是不同的. 为便于理解和记忆,先从一些概念入手,所有特殊字符或字符组合有一个总表在后面,最后一  些例子供理解相应的概念. 正则表达式 是由普通字符(例如字符 a 到 z)以及特殊字符(

  • vue mint-ui学习笔记之picker的使用

    本文介绍了vue mint-ui picker的使用,分享给大家,也给自己留个学习笔记 Picker的使用 import { Picker } from 'mint-ui'; Vue.component(Picker.name, Picker); API 示例一:picker的简单使用 xxx.vue: <template> <div id="app"> <mt-picker :slots="slots" ></mt-pic

  • Go语言学习笔记之反射用法详解

    本文实例讲述了Go学习笔记之反射用法.分享给大家供大家参考,具体如下: 一.类型(Type) 反射(reflect)让我们能在运行期探知对象的类型信息和内存结构,这从一定程度上弥(mi)补了静态语言在动态行为上的不足.同时,反射还是实现元编程的重要手段. 和 C 数据结构一样,Go 对象头部并没有类型指针,通过其自身是无法在运行期获知任何类型相关信息的.反射操作所需要的全部信息都源自接口变量.接口变量除存储自身类型外,还会保存实际对象的类型数据. func TypeOf(i interface{

随机推荐