Flink入门级应用域名处理示例

目录
  • 概述
  • 算子
    • FlatMap
    • KeyBy
  • Reduce
  • 连接socket测试
  • 连接kafka
    • 正式
    • 测试
  • 打包上传服务器

概述

最近做了一个小任务,要使用Flink处理域名数据,在4GB的域名文档中求出每个域名的顶级域名,最后输出每个顶级域名下的前10个子级域名。一个比较简单的入门级Flink应用,代码很容易写,主要用到的算子有FlatMap、KeyBy、Reduce。但是由于Maven打包问题,总是提示找不到入口类,卡了好久,最后也是成功解决了。

主体代码如下:

public class FlinkStreamingTopDomain {
    public static void main(String[] args) throws Exception{
        // 获取流处理运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 获取kafkaConsumer
        FlinkKafkaConsumer<String> kafkaConsumer = FlinkUtil.getKafkaConsumer("ahl_test1", "console-consumer-72096");
        // 从当前消费组下标开始读取
        kafkaConsumer.setStartFromEarliest();
        DataStreamSource text = env.addSource(kafkaConsumer);

        // 算子
        DataStream<Tuple2<String,String>> windowCount = text.flatMap(new FlatMap())
                .keyBy(0).reduce(new Reduce());
        //把数据打印到控制台
        windowCount.print()
                .setParallelism(16);//使用16个并行度
        //注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
        env.execute("streaming topDomain calculate");
    }
}

算子

FlatMap

Flatmap是对一行字符进行处理的,官网上的解释如下

FlatMap
DataStream → DataStream
Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:
dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
        throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});

其实和Hadoop的Map差不多,都是把一行字符串进行处理,得到我们想要的<key,value>,不同之处在于Map处理后得到的是<key,values[]>。即Hadoop的Map操作会按key自动的将value处理成数组的形式,而Flink的FlatMap算子只会把每行数据处理成key、value。

下面是我处理业务的FlatMap代码

    // FlatMap分割域名,并输出二元组<顶级域名,域名>
    public static class FlatMap implements FlatMapFunction<String, Tuple2<String,String>> {
        @Override
        public void flatMap(String s, Collector<Tuple2<String, String>> out) throws Exception {
            String[] values = s.split("\\^");   // 按字符^分割
            if(values.length - 1 < 2) {
                return;
            }
            String domain = values[2];
            out.collect(new Tuple2<String,String>(ToolUtil.getTopDomain(domain),domain));
        }
    }

我这里把数据处理成了二元组形式,之后reduce也是对这个二元组进行处理。

KeyBy

先来看看官网的解释

KeyBy
DataStream → KeyedStream

Logically partitions a stream into disjoint partitions. All records with the same key are assigned to the same partition. Internally, keyBy() is implemented with hash partitioning. There are different ways to specify keys.

This transformation returns a KeyedStream, which is, among other things, required to use keyed state.

dataStream.keyBy(value -&gt; value.getSomeKey()) // Key by field "someKey"
dataStream.keyBy(value -&gt; value.f0) // Key by the first element of a Tuple

Attention:A type cannot be a key if:
    1.it is a POJO type but does not override the hashCode() method and relies on the Object.hashCode() implementation.
    2.it is an array of any type.

keyBy会按照一个keySelector定义的方式进行哈希分区,会将一个流分成多个Partition,相同key的会被分在同一个分区,经过keyBy的流变成KeyedStream。

需要注意的有两点:

1.pojo类型作为key,必须重写hashcode()方法

2.数组类型不能作为key

Reduce

官网的解释如下

Reduce
KeyedStream → DataStream
A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.
A reduce function that creates a stream of partial sums:
keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1 + value2;
    }
});

reduce是进行”滚动“处理的,即reduce方法的第一个参数是当前已经得到的结果记为currentResult,第二个参数是当前要处理的<key,value>。流式计算会一条一条的处理数据,每处理完一条数据就得到新的currentResult。

业务处理代码如下

    // 拼接同一分区下的ip
    public static class Reduce implements ReduceFunction<Tuple2<String,String>>{
        @Override
        public Tuple2<String,String> reduce(Tuple2 t1, Tuple2 t2) throws Exception {
            String[] domains = t1.f1.toString().split("\\^");
            if(domains.length == 10){
                return t1;
            }
            t1.f1 = t1.f1.toString() + "^" + t2.f1.toString();
            System.out.println(t1.f1 );
            return t1;
        }
   }

连接socket测试

1.将主体代码里的kafka获取数据,改成socket获取数据

//        int port;
//        try {
//            ParameterTool parameterTool = ParameterTool.fromArgs(args);
//            port = parameterTool.getInt("port");
//        } catch (Exception e){
//            System.out.println("没有指定port参数,使用默认值1112");
//            port = 1112;
//        }

        // 连接socket获取输入数据
//        DataStreamSource<String> text = env.socketTextStream("192.168.3.221",port);

2.在服务器开启一个端口号:nc -l -p 1112

3.运行代码

4.服务器输入测试数据就可以实时的获取处理结果

连接kafka

正式

使用kafka命令创建主题

kafka-topics.sh --create --zookeeper IP1:2181 IP2:2181... --replication-factor 2 --partitions 16 --topic ahl_test

kafka建立topic需要先开启zookeeper

运行生产者jar包,用生产者读取数据

java -jar $jar包路径  $topic $path

测试

另外,还可以使用测试生产者实现和socket测试相同的效果

/kafka-console-producer.sh --broker-list slave3:9092 --topic ahl_test1

打包上传服务器

打包上传服务器注意不要使用idea提供的build方式,反正我使用build会一直报错找不到主类,即便我反编译jar包发现主类在里面,并且MF文件也有配置主类信息。这个问题卡了我很久,最后我使用mvn pakage的方式打包并运行成功,把我的打包插件贴出来帮助遇到和我相同问题的人

<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-shade-plugin</artifactId>
				<version>3.0.0</version>
				<executions>
					<execution>
						<phase>package</phase>
						<goals>
							<goal>shade</goal>
						</goals>
						<configuration>
							<!--							<createDependencyReducedPom>false</createDependencyReducedPom>-->
							<artifactSet>
								<excludes>
									<exclude>com.google.code.findbugs:jsr305</exclude>
									<exclude>org.slf4j:*</exclude>
									<exclude>log4j:*</exclude>
								</excludes>
							</artifactSet>
							<filters>
								<filter>
									<!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
									<artifact>*:*</artifact>
									<excludes>
										<exclude>META-INF/*.SF</exclude>
										<exclude>META-INF/*.DSA</exclude>
										<exclude>META-INF/*.RSA</exclude>
									</excludes>
								</filter>
							</filters>
							<transformers>
								<transformer
										implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
									<mainClass>com.ncs.flink.streaming.FlinkStreamingTopDomain</mainClass>
								</transformer>
							</transformers>
						</configuration>
					</execution>
				</executions>
			</plugin>
		</plugins>

Flink运行指令为:

/home/soft/flink-1.12.0//bin/flink run -c com.ncs.flink.streaming.FlinkStreamingDomainJob /home/ahl/flink/situation-mapred-flink-0.0.1-SNAPSHOT.jar

或者可以访问Flink集群的8081端口,在提供的UI页面上传运行

以上就是Flink入门级应用域名处理示例的详细内容,更多关于Flink域名处理的资料请关注我们其它相关文章!

(0)

相关推荐

  • IDEA上运行Flink任务的实战教程

    欢迎访问我的GitHub https://github.com/zq2599/blog_demos 内容:所有原创文章分类汇总及配套源码,涉及Java.Docker.Kubernetes.DevOPS等: IDEA是常用的IDE,我们编写的flink任务代码如果能直接在IDEA运行,会给学习和开发带来很大便利,例如改完代码立即运行不用部署.断点.单步调试等: 环境信息 电脑:2019版13寸MacBook Pro,2.3 GHz 四核Intel Core i5,8 GB 2133 MHz LPD

  • Apache Hudi结合Flink的亿级数据入湖实践解析

    目录 1. 实时数据落地需求演进 2. 基于Spark+Hudi的实时数据落地应用实践 3. 基于Flink自定义实时数据落地实践 4. 基于Flink + Hudi的落地数据实践 5. 后续应用规划及展望 5.1 取代离线报表,提高报表实时性及稳定性 5.2 完善监控体系,提升落数据任务稳定性 5.3 落数据中间过程可视化探索 本次分享分为5个部分介绍Apache Hudi的应用与实践 1. 实时数据落地需求演进 实时平台上线后,主要需求是开发实时报表,即抽取各类数据源做实时etl后,吐出实时

  • Apache FlinkCEP 实现超时状态监控的步骤详解

    CEP - Complex Event Processing复杂事件处理. 订单下单后超过一定时间还未进行支付确认. 打车订单生成后超过一定时间没有确认上车. 外卖超过预定送达时间一定时限还没有确认送达. Apache FlinkCEP API CEPTimeoutEventJob FlinkCEP源码简析 DataStream和PatternStream DataStream 一般由相同类型事件或元素组成,一个DataStream可以通过一系列的转换操作如Filter.Map等转换为另一个Da

  • Flink开发IDEA环境搭建与测试的方法

    一.IDEA开发环境 1.pom文件设置 <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <encoding>UTF-8</encoding> <scala.version>2.11.12</scala.version>

  • 解析Flink内核原理与实现核心抽象

    目录 一.环境对象 1.1 执行环境 StreamExecutionEnvironment LocalStreamEnvironment RemoteStreamEnvironment StreamContextEnvironment StreamPlanEnvironment ScalaShellStreamEnvironment 1.2 运行时环境 RuntimeEnvironment SavepointEnvironment 1.3 运行时上下文 StreamingRuntimeConte

  • Flink入门级应用域名处理示例

    目录 概述 算子 FlatMap KeyBy Reduce 连接socket测试 连接kafka 正式 测试 打包上传服务器 概述 最近做了一个小任务,要使用Flink处理域名数据,在4GB的域名文档中求出每个域名的顶级域名,最后输出每个顶级域名下的前10个子级域名.一个比较简单的入门级Flink应用,代码很容易写,主要用到的算子有FlatMap.KeyBy.Reduce.但是由于Maven打包问题,总是提示找不到入口类,卡了好久,最后也是成功解决了. 主体代码如下: public class

  • Flink 侧流输出源码示例解析

    目录 Flink 侧流输出源码解析 源码解析 TimestampedCollector#collect CountingOutput#collect BroadcastingOutputCollector#collect RecordWriterOutput#collect ProcessOperator#ContextImpl#output CountingOutput#collect BroadcastingOutputCollector#collect RecordWriterOutput

  • nginx配置二级域名的示例代码

    为了不让域名fangyuanxiaozhan.com闲置, 作者又买了个国内的虚拟主机(VPS)的ip为 111.230.254.173 , 用wordpress开了个博客网站, 由于vps的空间很大, 我就开了个私有网盘服务, 由于日常开发需要用到git, 但又不想公开代码, 我又开了个私有git服务 我的vps挂了三个服务, 分别是: WordPress搭建的博客服务, 运行于8000端口, 访问方式 http://fangyuanxiaozhan.com:8000 Gogs搭建的git服务

  • c#网站WebConfig中域名引用示例介绍

    在WebConfig中定义如下. 复制代码 代码如下: public class WebConfig { public static string ResourceServer = @"http://www.xxx.com/"; } 在前台页面中这样调用 复制代码 代码如下: <script src="<% =WebConfig.ResourceServer %>/js/jquery-ui-all-min-lastest.js" type=&quo

  • 修改.htaccess实现301域名重定向示例分享

    复制代码 代码如下: <IfModule mod_rewrite.c> RewriteEngineOnRewriteBase/#将www.jb51.net跳转到www.jb51.comRewriteCond%{HTTP_HOST}^www\.jb51\.net$[NC]RewriteRule^(.*)$http://www.jb51.net/$1[R=301,L]#将huanhang.net跳转到www.jb51.comRewriteCond%{HTTP_HOST}^huanhang\.net

  • 如何利用map实现Nginx允许多个域名跨域

    常见的 Nginx 配置允许跨域 server { listen 11111; server_name localhost; location ~ /xxx/xx { if ($request_method = 'OPTIONS') { return 204; } add_header Access-Control-Allow-Origin *; add_header Access-Control-Allow-Methods 'GET, POST, OPTIONS'; add_header Ac

  • 在微信小程序中保存网络图片

    微信代码片段点这里, 该功能需要添加appid才能进行正常的测试. 在小程序的文档中我们得知,wx.saveImageToPhotosAlbum是用来保存图片到相册的. 但是仔细一看会发现这个接口的filePath参数只接受临时文件路径或永久文件路径,不支持网络图片路径,意味着我们不能直接调用这个接口.. 因此先需要把该文件下载至本地,使用 wx.downloadFile . 但值得注意的是小程序只可以跟指定的域名与进行网络通信,也就是说下载图片之前,我们需要先去微信公众者平台的开发设置里设置u

  • 解决Nginx 配置 proxy_pass 后 返回404问题

    一. Nginx 配置 proxy_pass 后 返回404问题 故障解决和定位 1.1. 问题 在一次生产涉及多次转发的配置中, 需求是下面的图: 在配置好了 proxy_pass 之后,请求 www.djx.com 直接返回 404,没有什么其他的异常. 但是我们直接请求后端 www.baidu.com 是正常响应的.这就很怪异的. 看日志请求也是转发到了 www.baidu.com 的.但是请求响应就是404. 1.2. 寻找问题原因 我们的默认的 Nginx的 proxy_set_hea

  • Nginx 运维之域名验证的方法示例

    各公众平台在配置接口域名时会验证开发者对域名的配置权, 生成随机的文本及字符串,让放置在域名根目录可以通过域名直接访问到即通过验证. 示例为验证域名 abc.com 可以通过根路由访问 6CysNYj8Hb.txt 响应体为字符串 01df2ddab4774ba2676a5563ccb79ffa. $ curl https://abc.com/6CysNYj8Hb.txt 01df2ddab4774ba2676a5563ccb79ffa 方案一 配置有 root 的 server,直接把随机文档

  • 阿里云OSS域名配置及简单上传的示例代码

    目前开发系统,附件文件一般都会使用第三方的存储空间来保存,一方面是为了开发者提供便利,另一方可以减轻系统的访问压力,下面介绍一下阿里云的OSS的一些简单配置和使用. 一.阿里云OSS配置 前提:你需要购买阿里云的OSS服务器,这里就不多介绍:你需要有一个备案域名,此处也不多介绍(本人使用的阿里云进行备案的域名). 1. 阿里云OSS配置域名 1)创建bucket 2)选择tpw-bucket的"域名管理",然后点击绑定用户域名: 3)创建域名: 此处创建二级域名进行绑定,如果你想简单,

随机推荐