大数据HelloWorld-Flink实现WordCount

所有的语言开篇都是Hello Word,数据处理引擎也有Hello Word。那就是Word Count。MR,Spark,Flink以来开篇第一个程序都是Word Count。那么今天Flink开始目标就是在本地调试出Word Count。

单机安装Flink

开始Flink之前先在本机尝试安装一下Flink,当然FLink正常情况下是部署的集群方式。作者比较穷,机器配置太低开不了几个虚拟机。所以只能先演示个单机的安装。

Apache Flink需要在Java1.8+以上的环境中运行 。

所以,先确保自己的JDK版本是1.8包含以上的。

Flink单机部署非常简单,只需安装下载安装即可。如果需要与Hadoop版本结合,那么下载相应的Hadoop关联版本即可。如果不与Hadoop结合就直接下载Scala版即可。我这里就直接下载了Scala2.11的相关版本。

点击进入Apache页面进行下载,大小约有283MB。

把下载下来的压缩包进行解压即可。

打开命令行直接执行

/bin/start-cluster.bat

进行启动。

浏览器打开 http://localhost:8081

至此在Windows10环境下即完成Flink的启动。

编写WordCount

因为Flink是由Scala进行开发的,而Scala是基于JVM的一种语言。所以最终也会转换为JAVA字节码文件,所以Flink程序可以由Java、Scala两种语言都可以进行开发。也可以同时开发。比如Java写一部分代码,Scala写另一部分代码。可以参考<Apache Flink利用Maven对Scala与Java进行混编>。

Flink官方提供快速生成工程的两种工具:SBT与Maven。由于作者比较熟悉Maven,( 或者说没用过SBT )。所以直接使用Maven快速创建一个工程。

Java版本

mvn archetype:generate                \
   -DarchetypeGroupId=org.apache.flink       \
   -DarchetypeArtifactId=flink-quickstart-java   \
   -DarchetypeVersion=1.8.0

Scala版本

mvn archetype:generate                \
   -DarchetypeGroupId=org.apache.flink       \
   -DarchetypeArtifactId=flink-quickstart-scala   \
   -DarchetypeVersion=1.8.0

按照提示输入相关信息,即可生成最终的项目。

├── pom.xml
└── src
  └── main
    ├── resources
    │  └── log4j.properties
    └── scala/java
      └── org
        └── myorg
          └── quickstart
            ├── BatchJob.scala
            └── StreamingJob.scala

把工程导入到IDEA中

如果使用Scala的话,那么需要安装Scala的插件。搜索安装同时需要把Scala语言包进行安装。

不知道如何操作可以联系我 微信公号<指尖数虫>。

package jar;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class BatchJob {

	public static void main(String[] args) throws Exception {
		// set up the batch execution environment
		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		//读取目录下的文件
		DataSource<String> data = env.readTextFile("/opt/Server_Packets/log/ServerLog_1_runtime.log");
		//把文件中的内容按照空格进行拆分为 word,1  1 是为了能够在下面进行计算.
		data.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
			@Override
			public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
				for (String word : s.split(" ")){
					collector.collect(new Tuple2<>(word,1));
				}
			}
		})
		// 按照元组中的第1位进行分组
		.groupBy(0)
		// 分组的元组的计算方式为 value +value 也就是刚才的 同样的词 把 1+1
		.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
			@Override
			public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t2, Tuple2<String, Integer> t1) throws Exception {
				return new Tuple2<>(t1.f0,t1.f1+ t2.f1);
			}
		})
		//输出结果
		.print();
	}
}

总结

以上所述是小编给大家介绍的大数据HelloWorld-Flink实现WordCount,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对我们网站的支持!
如果你觉得本文对你有帮助,欢迎转载,烦请注明出处,谢谢!

(0)

相关推荐

  • 30个mysql千万级大数据SQL查询优化技巧详解

    1.对查询进行优化,应尽量避免全表扫描,首先应考虑在 where 及 order by 涉及的列上建立索引. 2.应尽量避免在 where 子句中对字段进行 null 值判断,否则将导致引擎放弃使用索引而进行全表扫描,如:select id from t where num is null可以在num上设置默认值0,确保表中num列没有null值,然后这样查询:select id from t where num=0 3.应尽量避免在 where 子句中使用!=或<>操作符,否则引擎将放弃使用

  • vue大数据表格卡顿问题的完美解决方案

    前言 vue渲染小数据挺快,大数据vue开始出现卡顿现象,本文讲给大家详细介绍关于vue大数据表格卡顿问题的解决方法 点我在线体验Demo(请用电脑查看) 亲测苹果电脑,chrome浏览器无卡顿现象,其它浏览器并未测试,如遇到卡顿请备注系统和浏览器,方便我后续优化,谢谢 先看一下效果,一共1000 X 100 = 10W个单元格基本感受不到卡顿,而且每个单元格点击可以编辑,支持固定头和固定列 项目源代码地址 Github (本地下载) 解决问题核心点:横向滚动加载,竖向滚动加载 项目背景 笔者最

  • Python3实现将本地JSON大数据文件写入MySQL数据库的方法

    本文实例讲述了Python3实现将本地JSON大数据文件写入MySQL数据库的方法.分享给大家供大家参考,具体如下: 最近导师给了一个yelp上的评论数据,数据量达到3.55个G,如果进行分析时直接使用本地文件,选择python来分析,那么效率是非常低的:另一方面使用SQL来储存文本文件最为安全,之前使用CSV,txt存储的文本文件最后莫名其妙地出现一些奇怪字符,导致读取数据分割时出现错乱.下面给出一个简单的代码,将本地JSON文件内容存入数据库. 说明:python版本为3.5,使用第三方库为

  • 大数据HelloWorld-Flink实现WordCount

    所有的语言开篇都是Hello Word,数据处理引擎也有Hello Word.那就是Word Count.MR,Spark,Flink以来开篇第一个程序都是Word Count.那么今天Flink开始目标就是在本地调试出Word Count. 单机安装Flink 开始Flink之前先在本机尝试安装一下Flink,当然FLink正常情况下是部署的集群方式.作者比较穷,机器配置太低开不了几个虚拟机.所以只能先演示个单机的安装. Apache Flink需要在Java1.8+以上的环境中运行 . 所以

  • 详解大数据处理引擎Flink内存管理

    内存模型 Flink可以使用堆内和堆外内存,内存模型如图所示: flink使用内存划分为堆内内存和堆外内存.按照用途可以划分为task所用内存,network memory.managed memory.以及framework所用内存,其中task network managed所用内存计入slot内存.framework为taskmanager公用. 堆内内存包含用户代码所用内存.heapstatebackend.框架执行所用内存. 堆外内存是未经jvm虚拟化的内存,直接映射到操作系统的内存地

  • 大数据Kafka:消息队列和Kafka基本介绍

    目录 一.什么是消息队列 二.消息队列的应用场景 异步处理 应用耦合 限流削峰 消息驱动系统 三.消息队列的两种方式 点对点模式 发布/订阅模式 四.常见的消息队列的产品 1) RabbitMQ 2) activeMQ: 3) RocketMQ 4) kafka 五.Kafka的基本介绍 一.什么是消息队列 消息队列,英文名:Message Queue,经常缩写为MQ.从字面上来理解,消息队列是一种用来存储消息的队列 .来看一下下面的代码 上述代码,创建了一个队列,先往队列中添加了一个消息,然后

  • IDEA 中使用 Big Data Tools 连接大数据组件

    目录 简介 安装 Big Data Tools 插件 Flink 配置(不推荐) Kafka 配置(推荐) HDFS 配置(推荐) 总结 简介 Big Data Tools 插件可用于 Intellij Idea 2019.2 及以后的版本.它提供了使用 Zeppelin,AWS S3,Spark,Google Cloud Storage,Minio,Linode,数字开放空间,Microsoft Azure 和 Hadoop 分布式文件系统(HDFS)来监视和处理数据的特定功能. 下面来看一下

  • SpringBoot+Thymeleaf+ECharts实现大数据可视化(基础篇)

    目录 0x01 新建SpringBoot项目 2. 编写HelloWorld程序代码 0x02 引入ECharts资源 1. 获取JQuery与ECharts资源 2. 新建ECharts模版html文件 3. 添加后台java代码 4. ECharts模版样式预览 0x03 SpringBoot整合Thymeleaf 1. 新建myECharts方法 2. 引入Thymeleaf 3. ECharts新样式预览 4. 模式升级 0xFF 总结 0x01 新建SpringBoot项目 1. 新建

  • Sqlserver 高并发和大数据存储方案

    随着用户的日益递增,日活和峰值的暴涨,数据库处理性能面临着巨大的挑战.下面分享下对实际10万+峰值的平台的数据库优化方案.与大家一起讨论,互相学习提高! 案例:游戏平台. 1.解决高并发 当客户端连接数达到峰值的时候,服务端对连接的维护与处理这里暂时不做讨论.当多个写请求到数据库的时候,这时候需要对多张表进行插入,尤其一些表 达到每天千万+的存储,随着时间的积累,传统的同步写入数据的方式显然不可取,经过试验,通过异步插入的方式改善了许多,但与此同时,对读取数据的实时性也需要做一定的牺牲. 异步的

  • Oracle的CLOB大数据字段类型操作方法

    一.Oracle中的varchar2类型 我们在Oracle数据库存储的字符数据一般是用VARCHAR2.VARCHAR2既分PL/SQL Data Types中的变量类型,也分Oracle Database中的字段类型,不同场景的最大长度不同. 在Oracle Database中,VARCHAR2 字段类型,最大值为4000:PL/SQL中 VARCHAR2 变量类型,最大字节长度为32767. 当 VARCHAR2 容纳不下我们需要存储的信息时,就出来的Oracle的大数据类型LOB( La

  • 如何将Oracle的一个大数据表快速迁移到 Sqlserver2008数据库(图文教程)

    oracle 服务器  版本  11.2.0.1.0 Sqlserver2008  R2 前提条件是 SQLSERVER服务器上安装了Oracle客户端并且进行了配置 不会配置的请参照 这个链接 1  登录MSSM 工具 2 选中其中一个数据库  右键⇒任务⇒导入数据 3   打开窗口 Sqlserver导入和导出向导   点击下一步 4 进入选择数据源画面 a: 数据源 选择  Microsoft OLE DB Provider for Oracle  然后 点击 右侧的 属性 按钮 5  数

  • 大数据量时提高分页的效率

    如我们在之前的教程里讨论的那样,分页可以通过两种方法来实现: 默认分页– 你仅仅只用选中data Web control的 智能标签的Enable Paging ; 然而,当你浏览页面的时候,虽然你看到的只是一小部分数据,ObjectDataSource 还是会每次都读取所有数据 自定义分页– 通过只从数据库读取用户需要浏览的那部分数据,提高了性能. 显然这种方法需要你做更多的工作. 默认的分页功能非常吸引人,因为你只需要选中一个checkbox就可以完成了.但是它每次都读取所有的数据,这种方式

  • java 文件大数据Excel下载实例代码

    java 文件大数据Excel下载实例代码 excel可以用xml表示.故可以以此来实现边写边下载文件 package com.tydic.qop.controller; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.I

随机推荐