SpringBoot超详细讲解集成Flink的部署与打包方法

目录
  • 一、SpringBoot集成Flink
  • 二、FlinkTask写法调整
  • 三、打包插件
  • 四、Flink的上传与运行
  • 总结

一、SpringBoot集成Flink

其实没什么特别的,就把Flink依赖的包在pom引入就行了。只是FlinkTask的写法要小调整下,把相关依赖交给spring管理就行。

然后如果放弃Flink的Dashboard端监控task执行相关信息,那也可以在SpringBoot的启动类里调用就行,但是可能出现task的相关对象没有注入,这种都是小问题(实际就是springboot启动完成再调用,或者通过自动任务调用。也可以在springBoot的入口类用@ComponentScan注解扫描flinkTask所在的目录)。

实际更潇洒一点的做法可以将flinkTask的信息存表,通过springBoot的接口调用restfullApi接口,接口里调用task,甚至可以做task的启停、线程监控(接口里开线程调用task)。

二、FlinkTask写法调整

@Component
@Slf4j
public class JianGongStopCarTask {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<JGParkingLotInfo> dataStream = env.addSource(new JGParkingLotInfoSource());
//       获取到数据之后转换格式  此处不做转换
        SingleOutputStreamOperator<JGParkingLotInfo> jgParkingLotInfoSingleOutputStreamOperator = dataStream.map(jgParkingLotInfo -> jgParkingLotInfo);
        jgParkingLotInfoSingleOutputStreamOperator.addSink(new SinkToMySQL());
        env.execute();
    }
}

PS:

实际就是@Component这个注解,然后这个示例里的JGParkingLotInfoSource、SinkToMySQL类都需要加这个注解。其他就没有什么调整了,然后如果还有其他依赖没有,使用hutool的SpringUtil进行get对象就行。

三、打包插件

<plugins>
      <!-- 编译插件 -->
      <plugin>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
          <encoding>UTF-8</encoding>
        </configuration>
      </plugin>
      <!--  spring boot 项目打包
       <plugin>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-maven-plugin</artifactId>
       </plugin>-->
      <!-- Flink打包方式一 -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.3.0</version>
        <configuration>
          <archive>
            <manifest>
              <mainClass>com.easylinkin.dc.olap.JianGongStopCarTask</mainClass>
            </manifest>
          </archive>
          <!-- 打包依赖 -->
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <!-- flink打包方式二
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.3.0</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <createDependencyReducedPom>false</createDependencyReducedPom>
              <artifactSet>
                <excludes>
                  <exclude>com.google.code.findbugs:jsr305</exclude>
                  <exclude>org.slf4j:*</exclude>
                  <exclude>log4j:*</exclude>
                </excludes>
              </artifactSet>
              <filters>
                <filter>
                  <artifact>*:*</artifact>
                  <excludes>
                    <exclude>module-info.class</exclude>
                    <exclude>META-INF/*.SF</exclude>
                    <exclude>META-INF/*.DSA</exclude>
                    <exclude>META-INF/*.RSA</exclude>
                  </excludes>
                </filter>
              </filters>
              <transformers>
                <transformer
                  implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                  <resource>META-INF/spring.handlers</resource>
                  <resource>reference.conf</resource>
                </transformer>
                <transformer
                  implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
                  <resource>META-INF/spring.factories</resource>
                </transformer>
                <transformer
                  implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                  <resource>META-INF/spring.schemas</resource>
                </transformer>
                <transformer
                  implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                <transformer
                  implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>com.easylinkin.dc.olap.JianGongStopCarTask</mainClass>
                </transformer>
              </transformers>
            </configuration>
          </execution>
        </executions>
      </plugin>
      -->
      <!-- flink打包方式三
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-jar-plugin</artifactId>
        <configuration>
          <archive>
            <manifest>
              <mainClass>com.easylinkin.dc.olap.JianGongStopCarTask</mainClass>
              <useUniqueVersions>false</useUniqueVersions>
              <addClasspath>true</addClasspath>
              <classpathPrefix>./lib/</classpathPrefix>
            </manifest>
          </archive>
          <excludes>
            <exclude>module-info.class</exclude>
            <exclude>META-INF/*.SF</exclude>
            <exclude>META-INF/*.DSA</exclude>
            <exclude>META-INF/*.RSA</exclude>
            <exclude>com.google.code.findbugs:jsr305</exclude>
            <exclude>org.slf4j:*</exclude>
            <exclude>log4j:*</exclude>
          </excludes>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-dependency-plugin</artifactId>
        <version>3.3.0</version>
        <executions>
          <execution>
            <id>copy-dependencies</id>
            <phase>package</phase>
            <goals>
              <goal>copy-dependencies</goal>
            </goals>
            <configuration>
              <outputDirectory>${project.build.directory}/lib</outputDirectory>
              <excludeTransitive>false</excludeTransitive>
              <stripVersion>false</stripVersion>
            </configuration>
          </execution>
        </executions>
      </plugin>-->
    </plugins>

除了打成SpringBoot用springboot的插件打包,flinkTask的打包有3种方式,方式三适合lib包提前传到task指定的依赖存储目录。这样上传flinkTask就很小。

方式二是官方推荐FlinkTask的打包方式,地址:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/configuration/maven/

说一千道一万就是因为打包的META-INF下的MANIFEST.MF文件的内容有区别。springBoot项目的这个文件有自己的JarLauncher。

FlinkTask的jar这个文件内容

四、Flink的上传与运行

1、上传并命令运行

配置好flink环境,命令就是

flink run WordCount.jar

2、Flink管理大屏上传运行

点击“Submit”

总结

这样处理应该是最优雅的了,task的写法改动也小。

官方用的是方式二打包,实际我觉得要是依赖特多,用方式三打包,然后将依赖的jar传到flink运行环境flinkTask指定的目录下应该也不错(flinkTask的包就变小了)

flink的这个计算监控真香

所以基本还是就当springBoot集成flink的项目一样开发吧,在打包的时候注意切换插件就ok了。就分享到这里,up!

到此这篇关于SpringBoot超详细讲解集成Flink的部署与打包方法的文章就介绍到这了,更多相关SpringBoot Flink内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Flink入门级应用域名处理示例

    目录 概述 算子 FlatMap KeyBy Reduce 连接socket测试 连接kafka 正式 测试 打包上传服务器 概述 最近做了一个小任务,要使用Flink处理域名数据,在4GB的域名文档中求出每个域名的顶级域名,最后输出每个顶级域名下的前10个子级域名.一个比较简单的入门级Flink应用,代码很容易写,主要用到的算子有FlatMap.KeyBy.Reduce.但是由于Maven打包问题,总是提示找不到入口类,卡了好久,最后也是成功解决了. 主体代码如下: public class

  • 浅谈Flink容错机制之作业执行和守护进程

    一.作业执行容错 Flink 的错误恢复机制分为多个级别,即 Execution 级别的 Failover 策略和 ExecutionGraph 级别的 Job Restart 策略.当出现错误时,Flink 会先尝试触发范围小的错误恢复机制,如果仍处理不了才会升级为更大范围的错误恢复机制,具体可以看下面的序列图. 当 Task 发生错误,TaskManager 会通过 RPC 通知 JobManager,后者将对应 Execution 的状态转为 failed 并触发 Failover 策略.

  • Flink流处理引擎零基础速通之数据的抽取篇

    目录 一.CDC 二.常见CDC的比较 三.Flink CDC 四.Flink CDC支持的数据库 五.阿里实现的FlinkCDC使用示例 依赖引入 基于table 基于sql 总结 一.CDC CDC (Change Data Capture) ,在广义的概念上,只要能捕获数据变更的技术,都可以称为 CDC .但通常我们说的CDC 技术主要面向数据库(包括常见的mysql,Oracle, MongoDB等)的变更,是一种用于捕获数据库中数据变更的技术. 二.常见CDC的比较 常见的主要包括Fl

  • 解析Flink内核原理与实现核心抽象

    目录 一.环境对象 1.1 执行环境 StreamExecutionEnvironment LocalStreamEnvironment RemoteStreamEnvironment StreamContextEnvironment StreamPlanEnvironment ScalaShellStreamEnvironment 1.2 运行时环境 RuntimeEnvironment SavepointEnvironment 1.3 运行时上下文 StreamingRuntimeConte

  • Flink支持哪些数据类型?

    一.支持的数据类型 Flink 对可以在 DataSet 或 DataStream 中的元素类型进行了一些限制.这样做的原因是系统会分析类型以确定有效的执行策略. 1.Java Tuple 和 Scala Case类: 2.Java POJO: 3.基本类型: 4.通用类: 5.值: 6.Hadoop Writables; 7.特殊类型 二.Flink之Tuple类型 Tuple类型  Tuple 是flink 一个很特殊的类型 (元组类型),是一个抽象类,共26个Tuple子类继承Tuple 

  • SpringBoot超详细讲解集成Flink的部署与打包方法

    目录 一.SpringBoot集成Flink 二.FlinkTask写法调整 三.打包插件 四.Flink的上传与运行 总结 一.SpringBoot集成Flink 其实没什么特别的,就把Flink依赖的包在pom引入就行了.只是FlinkTask的写法要小调整下,把相关依赖交给spring管理就行. 然后如果放弃Flink的Dashboard端监控task执行相关信息,那也可以在SpringBoot的启动类里调用就行,但是可能出现task的相关对象没有注入,这种都是小问题(实际就是spring

  • SpringBoot超详细讲解多数据源集成

    目录 一.多数据源使用场景与弊端 1.场景 2.弊端 二.使用步骤 1.引入库 2.多数据源配置文件 3.多数据源配置类 4.使用 总结 一.多数据源使用场景与弊端 1.场景 业务系统跨数据库 数据转存(这个现在太low了,应该高级点都不用) 系统集成 2.弊端 跨库业务事务问题 service.dao不能重复注入数据源 二.使用步骤 1.引入库 <!-- 多数据源支持 --> <dependency> <groupId>com.baomidou</groupId

  • SpringBoot超详细讲解yaml配置文件

    目录 1.文件类型 A.properties配置文件类型 B.yaml 基本语法 数据类型 2.配置提示 1.文件类型 A.properties配置文件类型 同以前properties用法一样 B.yaml 简介: YAML 是 "YAML Ain't Markup Language"(YAML 不是一种标记语言)的递归缩写.在开发的这种语言时,YAML 的意思其实是:"Yet Another Markup Language"(仍是一种标记语言). 非常适合用来做以

  • SpringBoot超详细讲解自动配置原理

    目录 SpringBoot自动配置原理 SpringBoot特点 1.依赖管理 A.父项目做依赖管理 B.开发导入starter场景启动器 C.可以修改默认版本号 2.自动配置 A.自动配好Tomcat B.自动配好SpringMVC C.默认的包结构 D.各种配置拥有默认值 E.按需要加载所有自动配置项 SpringBoot自动配置原理 了解SpringBoot自动配置原理 1.SpringBoot特点 2.容器功能 3.自动配置原理入门 4.开发技巧 SpringBoot特点 1.依赖管理

  • SpringBoot超详细讲解Thymeleaf模板引擎

    Jsp是最早的模板技术,用来处理视图层的,用来做数据显示的模板 B S结构: B:浏览器:用来显示数据,发送请求,没有处理能力 发送一个请求,访问a.jsp,a.jsp在服务器端变成Servlet,在将输出的数据返回给浏览器,浏览器就可以看到结果数据,jsp最终翻译过来也是个html页面 模板技术你就可以把它们当成字符串的替换,比如说:这里{data}这里有一个字符串,你把它换成固定值其他值,但是这个替换有一些附加的功能,通过模板技术处理视图层的内容 第一个例子: pom.xml:Thymele

  • SpringBoot超详细讲解@Enable*注解和@Import

    目录 @Enable* 解决办法 解放方案一 解决方案二 解决方案三 @Import 1.导入Bean 2.导入配置类 3.导入ImportSelector实现类 4.导入ImportBeanDefinitionRegistrar实现类 @Enable* 创建一个主启动类 package com.you.boot; import com.you.config.EnableUser; import com.you.config.UserConfig; import org.springframew

  • SpringBoot超详细讲解@Value注解

    目录 一.非配置文件注入 1.注入普通字符串 2.注入JAVA系统变量 3.注入表达式 4.注入其他Bean属性 5.注入文件资源 6.注入URL资源 二.通过配置文件注入 1.注入普通字符串 2.注入基本类型 3.注入数组类型 4.注入List类型 5.注入Map类型 一.非配置文件注入 1.注入普通字符串 直接附在属性名上,在 Bean 初始化时,会赋初始值. @Value("admin") private String name; 2.注入JAVA系统变量 @Value(&quo

  • SpringBoot超详细讲解事务管理

    目录 1. 事务的定义 2. 事务的特性 3. 事务的隔离性 4. 事务管理 5. 示例 1. 事务的定义 事务是由 N 步数据库操作序列组成的逻辑执行单元,这系列操作要么全部执行,要么全部放弃执行. 2. 事务的特性 事务的 ACID 特性: 原子性:事务是应用中不可分割的最小执行体 一致性:事务执行的结果必须使得数据从一个一致性状态转变为另一个一致性状态 隔离性:各个事务的执行互不干扰,任何事务的内部操作对其他事务都是隔离的 持久性:事务一旦提交,对数据所做的任何修改都要记录到永久存储器中

  • 超详细讲解SpringBoot参数校验实例

    目录 使用传统方式的弊端 引入依赖 注解说明 一.对实体类进行校验 1.entity 2.controller 3.编写全局统一异常处理 二.针对单个参数进行校验 三.分组校验 1.entity 2.controller 四.自定义分组校验 1.entity 2.CustomSequenceProvider 3.controller 五.自定义校验 1.定义校验注解 2.实现注解 六.嵌套校验 七.快速失败 注意事项 总结 使用传统方式的弊端 public String addUser(User

  • SpringBoot超详细深入讲解底层原理

    目录 手写springboot Springboot项目 自动配置 小结 手写springboot 在日常开发中只需要引入下面的依赖就可以开发Servlet进行访问了. <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> 那这是怎么做到的呢?今天就来

随机推荐