Java lambda表达式实现Flink WordCount过程解析

这篇文章主要介绍了Java lambda表达式实现Flink WordCount过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

本篇我们将使用Java语言来实现Flink的单词统计。

代码开发

环境准备

导入Flink 1.9 pom依赖

<dependencies>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>1.9.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.11</artifactId>
      <version>1.9.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-lang3</artifactId>
      <version>3.7</version>
    </dependency>
  </dependencies>

构建Flink流处理环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

自定义source

每秒生成一行文本

DataStreamSource<String> wordLineDS = env.addSource(new RichSourceFunction<String>() {
      private boolean isCanal = false;
      private String[] words = {
          "important oracle jdk license update",
          "the oracle jdk license has changed for releases starting april 16 2019",
          "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ",
          "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ",
          "downloading and using this product an faq is available here ",
          "commercial license and support is available with a low cost java se subscription",
          "oracle also provides the latest openjdk release under the open source gpl license at jdk java net"
      };

      @Override
      public void run(SourceContext<String> ctx) throws Exception {
        // 每秒发送一行文本
        while (!isCanal) {
          int randomIndex = RandomUtils.nextInt(0, words.length);
          ctx.collect(words[randomIndex]);
          Thread.sleep(1000);
        }
      }

      @Override
      public void cancel() {
        isCanal = true;
      }
    });

单词计算

// 3. 单词统计
    // 3.1 将文本行切分成一个个的单词
    SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> {
      // 切分单词
      Arrays.stream(line.split(" ")).forEach(word -> {
        ctx.collect(word);
      });
    }).returns(Types.STRING);

    //3.2 将单词转换为一个个的元组
    SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS
        .map(word -> Tuple2.of(word, 1))
        .returns(Types.TUPLE(Types.STRING, Types.INT));

    // 3.3 按照单词进行分组
    KeyedStream<Tuple2<String, Integer>, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0);

    // 3.4 对每组单词数量进行累加
    SingleOutputStreamOperator<Tuple2<String, Integer>> resultDS = keyedDS
        .timeWindow(Time.seconds(3))
        .reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1));

    resultDS.print();

参考代码

public class WordCount {
  public static void main(String[] args) throws Exception {
    // 1. 构建Flink流式初始化环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 2. 自定义source - 每秒发送一行文本
    DataStreamSource<String> wordLineDS = env.addSource(new RichSourceFunction<String>() {
      private boolean isCanal = false;
      private String[] words = {
          "important oracle jdk license update",
          "the oracle jdk license has changed for releases starting april 16 2019",
          "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ",
          "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ",
          "downloading and using this product an faq is available here ",
          "commercial license and support is available with a low cost java se subscription",
          "oracle also provides the latest openjdk release under the open source gpl license at jdk java net"
      };

      @Override
      public void run(SourceContext<String> ctx) throws Exception {
        // 每秒发送一行文本
        while (!isCanal) {
          int randomIndex = RandomUtils.nextInt(0, words.length);
          ctx.collect(words[randomIndex]);
          Thread.sleep(1000);
        }
      }

      @Override
      public void cancel() {
        isCanal = true;
      }
    });

    // 3. 单词统计
    // 3.1 将文本行切分成一个个的单词
    SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> {
      // 切分单词
      Arrays.stream(line.split(" ")).forEach(word -> {
        ctx.collect(word);
      });
    }).returns(Types.STRING);

    //3.2 将单词转换为一个个的元组
    SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS
        .map(word -> Tuple2.of(word, 1))
        .returns(Types.TUPLE(Types.STRING, Types.INT));

    // 3.3 按照单词进行分组
    KeyedStream<Tuple2<String, Integer>, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0);

    // 3.4 对每组单词数量进行累加
    SingleOutputStreamOperator<Tuple2<String, Integer>> resultDS = keyedDS
        .timeWindow(Time.seconds(3))
        .reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1));

    resultDS.print();

    env.execute("app");
  }
}

Flink对Java Lambda表达式支持情况

Flink支持Java API所有操作符使用Lambda表达式。但是,但Lambda表达式使用Java泛型时,就需要声明类型信息。

我们来看下上述的这段代码:

SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> {
      // 切分单词
      Arrays.stream(line.split(" ")).forEach(word -> {
        ctx.collect(word);
      });
    }).returns(Types.STRING);

之所以这里将所有的类型信息,因为Flink无法正确自动推断出来Collector中带的泛型。我们来看一下FlatMapFuntion的源代码

@Public
@FunctionalInterface
public interface FlatMapFunction<T, O> extends Function, Serializable {

  /**
  * The core method of the FlatMapFunction. Takes an element from the input data set and transforms
  * it into zero, one, or more elements.
  *
  * @param value The input value.
  * @param out The collector for returning result values.
  *
  * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
  *          to fail and may trigger recovery.
  */
  void flatMap(T value, Collector<O> out) throws Exception;
}

我们发现 flatMap的第二个参数是Collector<O>,是一个带参数的泛型。Java编译器编译该代码时会进行参数类型擦除,所以Java编译器会变成成:

void flatMap(T value, Collector out)

这种情况,Flink将无法自动推断类型信息。如果我们没有显示地提供类型信息,将会出现以下错误:

org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing.
  In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved.
  An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface.
  Otherwise the type has to be specified explicitly using type information.

这种情况下,必须要显示指定类型信息,否则输出将返回值视为Object类型,这将导致Flink无法正确序列化。

所以,我们需要显示地指定Lambda表达式的参数类型信息,并通过returns方法显示指定输出的类型信息

我们再看一段代码:

SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS
        .map(word -> Tuple2.of(word, 1))
        .returns(Types.TUPLE(Types.STRING, Types.INT));

为什么map后面也需要指定类型呢?

因为此处map返回的是Tuple2类型,Tuple2是带有泛型参数,在编译的时候同样会被查出泛型参数信息,导致Flink无法正确推断。

更多关于对Java Lambda表达式的支持请参考官网:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/java_lambdas.html

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • lambda表达式解决java后台分组排序过程解析

    需求:按照起始日期查询出数据库里一段连续日期的住院信息. 问题:数据库里的住院信息可能不是完整的,也就是在给出的日期区间里只有若干天的数据,缺少某些日期的数据. 解决: 1.需要我们先按日期分组查出数据库里有的数据: 2.然后遍历日期,将不存在的日期以日期为key,value为null插入集合里: 3.对集合里的key即日期进行排序. 注:这里分组和排序都用JDK8的新特性lambda表达式 /** * * @param startTime 开始时间 * @param endTime 结束时间

  • java lambda表达式用法总结

    这篇文章主要介绍了java lamda表达式用法总结,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 1.什么是函数式编程(百度百科上的解释) 2.为什么要使用函数式编程(有什么好处) 1.代码简洁,减少代码量 2.接近自然语言,容易理解 传统实现分组 List<Student> students; Map<String,List<Student>> maps = Maps.newHashMap(); for(Stude

  • Java JDK 1.8 lambda的用法详解

    具体代码如下所示: public class Student { private String id; private String name; private String age; private String address; public Student(String id, String name, String age, String address) { this.id = id; this.name = name; this.age = age; this.address = a

  • Java8 用Lambda表达式给List集合排序的实现

    Lambda用到了JDK8自带的一个函数式接口Comparator<T>. 准备一个Apple类 public class Apple { private int weight; private String color; public Apple(){} public Apple(int weight) { this.weight = weight; } public Apple(int weight, String color) { this.weight = weight; this.c

  • Java8中Lambda表达式使用和Stream API详解

    前言 Java8 的新特性:Lambda表达式.强大的 Stream API.全新时间日期 API.ConcurrentHashMap.MetaSpace.总得来说,Java8 的新特性使 Java 的运行速度更快.代码更少.便于并行.最大化减少空指针异常. 0x00. 前置数据 private List<People> peoples = null; @BeforeEach void before () { peoples = new ArrayList<>(); peoples

  • Java Lambda表达式与匿名内部类的联系和区别实例分析

    本文实例讲述了Java Lambda表达式与匿名内部类的联系和区别.分享给大家供大家参考,具体如下: 一 点睛 Lambda表达式与匿名内部类存在如下相同点: Lambda表达式与匿名内部类一样,都可以直接访问"effectively final"的局部变量,以及外部类的成员变量(包括实例变量和类变量). Lambda表达式创建的对象与匿名内部类生成的对象一样, 都可以直接调用从接口继承得到的默认方法. Lambda表达式与匿名内部类主要存在如下区别: 匿名内部类可以为任意接口创建实例

  • 详解Java中的Lambda表达式

    简介 Lambda表达式是Java SE 8中一个重要的新特性.lambda表达式允许你通过表达式来代替功能接口. lambda表达式就和方法一样,它提供了一个正常的参数列表和一个使用这些参数的主体(body,可以是一个表达式或一个代码块). Lambda表达式还增强了集合库. Java SE 8添加了2个对集合数据进行批量操作的包: java.util.function 包以及java.util.stream 包. 流(stream)就如同迭代器(iterator),但附加了许多额外的功能.

  • Java8与Scala中的Lambda表达式深入讲解

    前言 最近几年Lambda表达式风靡于编程界.很多现代编程语言都把它作为函数式编程的基本组成部分.基于JVM的编程语言如Scala.Groovy及Clojure把它作为关键部分集成在语言中.而如今,(最终)Java 8也加入了这个有趣的行列. Java8 终于要支持Lambda表达式!自2009年以来Lambda表达式已经在Lambda项目中被支持.在那时候,Lambda表达式仍被称为Java闭包.在我们进入一些代码示例以前,先来解释下为什么Lambda表达式在Java程序员中广受欢迎. 1.为

  • Java lambda表达式实现Flink WordCount过程解析

    这篇文章主要介绍了Java lambda表达式实现Flink WordCount过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 本篇我们将使用Java语言来实现Flink的单词统计. 代码开发 环境准备 导入Flink 1.9 pom依赖 <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>

  • Java Lambda 表达式源码解析

    Java Lambda 源码分析 问题: Lambda 表达式是什么?JVM 内部究竟是如何实现 Lambda 表达式的?为什么要这样实现? 一.基本概念 1.Lambda 表达式 下面的例子中,() -> System.out.println("1") 就是一个 Lambda 表达式.Java 8 中每一个 Lambda 表达式必须有一个函数式接口与之对应.Lambda 表达式就是函数式接口的一个实现. @Test public void test0() { Runnable

  • Java Lambda表达式之从集合到流

    从集合到流 现在我们用代码来具体表示对某一集合进行迭代操作,我们希望定义一个Contact类来表示联系人,并将ContactList中所有String类型的联系人姓名全部包装进Contact类中: List<Contact> contacts = new ArrayList<>(); contactList.forEach(new Consumer<String>() { @Override public void accept(String s) { Contact

  • Java Lambda 表达式详解及示例代码

    Java Lambda 表达式是 Java 8 引入的一个新的功能,可以说是模拟函数式编程的一个语法糖,类似于 Javascript 中的闭包,但又有些不同,主要目的是提供一个函数化的语法来简化我们的编码. Lambda 基本语法 Lambda 的基本结构为 (arguments) -> body,有如下几种情况: 参数类型可推导时,不需要指定类型,如 (a) -> System.out.println(a) 当只有一个参数且类型可推导时,不强制写 (), 如 a -> System.o

  • 详细分析Java Lambda表达式

    在了解Lambda表达式之前我们先来区分一下面向对象的思想和函数式编程思想的区别 面向对象的思想: 做一件事情,找一个能解决这个事情的对象,调用他的方法来解决 函数时编程思想: 只要能获取到结果,谁去做的都不重要,重视的是结果,不重视过程 使用Lambda表达式的目的是为了简化我们的代码 匿名内部类虽然也简化了我们的代码,但是Lambda比他更简单,而且语法也更加少 下面我用一段代码来演示一下二者的区别 public class Main { public static void main(St

  • java lambda 表达式中的双冒号的用法说明 ::

    双冒号运算就是Java中的[方法引用],[方法引用]的格式是 类名::方法名 注意是方法名哦,后面没有括号"()"哒.为啥不要括号,因为这样的是式子并不代表一定会调用这个方法.这种式子一般是用作Lambda表达式,Lambda有所谓懒加载嘛,不要括号就是说,看情况调用方法. 例如 表达式: person -> person.getAge(); 可以替换成 Person::getAge 表达式 () -> new HashMap<>(); 可以替换成 HashMa

  • 深入浅出理解Java Lambda表达式之四大核心函数式的用法与范例

    目录 1.四大核心函数式接口 1.1 Consumer<T> : 消费型接口 1.2 Supplier<T> : 供给型接口 1.3 Function<T, R> : 函数型接口 1.4 Predicate<T> : 断言型接口 2.方法引用 2.1 对象 :: 实例方法 2.2 类 :: 静态方法 2.3 类 :: 实例方法 3.构造器引用 4.数组引用 1.四大核心函数式接口 上一篇文章中说到了Lambda表达式中的基本语法,以及我们如何自定义函数式接口

  • Java Lambda表达式详解和实例

    简介 Lambda表达式是Java SE 8中一个重要的新特性.lambda表达式允许你通过表达式来代替功能接口. lambda表达式就和方法一样,它提供了一个正常的参数列表和一个使用这些参数的主体(body,可以是一个表达式或一个代码块). Lambda表达式还增强了集合库. Java SE 8添加了2个对集合数据进行批量操作的包: java.util.function 包以及 java.util.stream 包. 流(stream)就如同迭代器(iterator),但附加了许多额外的功能.

  • Java对象转json的方法过程解析

    这篇文章主要介绍了Java对象转json的方法过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 1. jsonlib:个人感觉最麻烦的一个需要导入的包也多,代码也相对多一些. 2.Gson:google的 3.FastJson:阿里巴巴的,个人觉得这个比较好,而且据说这个也是性能最好一个. 下面就贴出三种写法的代码,读者可以任选其一去使用.关于demo里面所使用的jar包,可以自行去下载. Jsonlib: package json; i

  • Java Lambda表达式入门示例

    本文实例讲述了Java Lambda表达式.分享给大家供大家参考,具体如下: 一 点睛 Lambda表达式支持将代码块作为方法参数,Lambda表达式允许使用更简洁的代码来创建只有一个抽象方法的接口(函数式接口)的实例. Lambda表达式主要作用就是代替匿名内部类的繁琐语法. 它由三部分组成: 形参列表.形参列表允许省略形参类型.如果形参列表中只有一个参数,甚至连形参列表的圆括号也可以省略. 箭头(->),必须通过英文等号和大于符号组成. 代码块.如果代码块只有包含一条语句,Lambda表达式

随机推荐