Java 数据流之Broadcast State

一、BroadcastState 的介绍

广播状态(Broadcast State)是 Operator State 的一种特殊类型。如果我们需要将配置 、规则等低吞吐事件流广播到下游所有 Task 时,就可以使用 BroadcastState。下游的 Task 接收这些配置、规则并保存为 BroadcastState,所有Task 中的状态保持一致,作用于另一个数据流的计算中。
简单理解:一个低吞吐量流包含一组规则,我们想对来自另一个流的所有元素基于此规则进行评估。
场景:动态更新计算规则。

广播状态与其他操作符状态的区别在于:

  • 它有一个 map 格式,用于定义存储结构
  • 它仅对具有广播流和非广播流输入的特定操作符可用
  • 这样的操作符可以具有不同名称的多个广播状态

二、BroadcastState 操作流程

三、案例实现

  • 从端口读取Json数据作为事件流
  • 从Mysql读取数据作为广播流
  • 关联广播流和事件流
  • 匹配对应的用户信息
package cn.kgc.broadcast

import java.sql.{Connection, DriverManager, PreparedStatement}

import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.state.{BroadcastState, MapStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

// (001,'tom',18,'北京',15830010002)
// 定义样例类 接受 MySQL的用户数据
case class BaseUserInfo(id:Long,name:String,age:Int,city:String,phone:Long)

// user_id、user_name、user_addrss、behaviour、url
// 输出数据类型
case class UserVisitInfo(id:Long,name:String,city:String,behaviour:String,url:String)

// 实现广播ProcessFunction
class MyBroadcastFunc extends BroadcastProcessFunction[String,(Long, BaseUserInfo),UserVisitInfo]{

  lazy val mapStateDes = new MapStateDescriptor[Long, BaseUserInfo]("mapState",classOf[Long],classOf[BaseUserInfo])

  // 处理的是日志流中的每条数据
  override def processElement(value: String, ctx: BroadcastProcessFunction[String, (Long, BaseUserInfo), UserVisitInfo]#ReadOnlyContext, out: Collector[UserVisitInfo]): Unit = {
    // {"user_id":"001","ts":"2021-07-10 11:10:05","behaviour":"browse","url":"https://www.tb1.com/1.html"}
    val user_id = JSON.parseObject(value).getLong("user_id")
    val behaviour = JSON.parseObject(value).getString("behaviour")
    val url = JSON.parseObject(value).getString("url")

    val mapState = ctx.getBroadcastState(mapStateDes)
    val userInfo = mapState.get(user_id)

    out.collect(UserVisitInfo(user_id,userInfo.name,userInfo.city,behaviour,url))

  }

  // 处理的是广播流的每个值
  override def processBroadcastElement(value: (Long, BaseUserInfo), ctx: BroadcastProcessFunction[String, (Long, BaseUserInfo), UserVisitInfo]#Context, out: Collector[UserVisitInfo]): Unit = {
    val mapState: BroadcastState[Long, BaseUserInfo] = ctx.getBroadcastState(mapStateDes)
    mapState.put(value._1,value._2)
  }
}

class UserSourceFunc extends RichParallelSourceFunction[BaseUserInfo]{

  var conn:Connection = _
  var statement: PreparedStatement = _
  var flag:Boolean = true

  override def open(parameters: Configuration): Unit = {
    conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&serverTimezone=UTC","root","liu911223")
    statement = conn.prepareStatement("select * from base_user")
  }

  override def run(ctx: SourceFunction.SourceContext[BaseUserInfo]): Unit = {
    while (flag){
      Thread.sleep(5000)
      val resultSet = statement.executeQuery()
      while (resultSet.next()){
        val id = resultSet.getLong(1)
        val name = resultSet.getString(2)
        val age = resultSet.getInt(3)
        val city = resultSet.getString(4)
        val phone = resultSet.getLong(5)
        ctx.collect(BaseUserInfo(id,name,age,city,phone))
      }
    }
  }

  override def cancel(): Unit = {
    flag = false
  }

  override def close(): Unit = {
    if (statement != null) statement.close()
    if (conn != null) conn.close()
  }
}
object BroadcastDemo01 {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 定义为KV,一方面是为了广播的时候定义为map,另一方面是为了做关联操作
    val userBaseDS: DataStream[(Long, BaseUserInfo)] = env.addSource(new UserSourceFunc)
      .map(user => (user.id, user))
    val mapStateDes = new MapStateDescriptor[Long, BaseUserInfo]("mapState",classOf[Long],classOf[BaseUserInfo])
    val broadCastStream: BroadcastStream[(Long, BaseUserInfo)] = userBaseDS.broadcast(mapStateDes)

    // 日志JSON数据
    val dataInfoDS: DataStream[String] = env.socketTextStream("master",1314)

    dataInfoDS.connect(broadCastStream)
      .process(new MyBroadcastFunc)
      .print()

    env.execute()
  }
}

到此这篇关于Java 数据流之Broadcast State的文章就介绍到这了,更多相关Java Broadcast State内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java流操作之数据流实例代码

    实例1: package dataInputStreamAndPrintStreamDemo; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.PrintStream; //示范如何自键

  • java常用数据流应用实例解析

    这篇文章主要介绍了java常用数据流应用实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 按操作单位的不同分为:字节流(8bit)(InputStream.OuputStream).字符流(16bit)(Reader.Writer) 按数据流的流向不同分为:输入流.输出流 按角色的不同分为:节点流.处理流 一.不带缓冲的流 1.文件字节输入流.文件字节输出流 package anno; import java.io.File; impor

  • Java 数据流之Broadcast State

    一.BroadcastState 的介绍 广播状态(Broadcast State)是 Operator State 的一种特殊类型.如果我们需要将配置 .规则等低吞吐事件流广播到下游所有 Task 时,就可以使用 BroadcastState.下游的 Task 接收这些配置.规则并保存为 BroadcastState,所有Task 中的状态保持一致,作用于另一个数据流的计算中. 简单理解:一个低吞吐量流包含一组规则,我们想对来自另一个流的所有元素基于此规则进行评估. 场景:动态更新计算规则.

  • Java如何利用状态模式(state pattern)替代if else

    大多数开发人员现在还在使用if else的过程结构,曾看过jdon的banq大哥写的一篇文章,利用command,aop模式替代if else过程结构.当时还不太明白,这几天看了<重构>第一章的影片租赁案例,感触颇深.下面我来谈一谈为什么要用state pattern替代if else,替代if else有什么好处,以及给出详细代码怎么替代if else.本文参考jdon的"你还在使用if else吗?"及<重构>第一章. 首先我们模仿影片租赁过程,顾客租凭影片

  • Java常用数据流全面大梳理

    目录 缓冲流 转换流 标准输入输出流 打印流 数据流 对象流 随机存取文件流 Java NIO 缓冲流 为了提高数据读写的速度,Java API提供了带缓冲功能的流类,在使用这些流类时,会创建一个内部缓冲区数组,缺省使用8192个字节(8Kb)的缓冲区. 缓冲流要"套接"在相应的节点流之上,根据数据操作单位可以把缓冲流分为: BufferedInputStream 和 BufferedOutputStream BufferedReader 和 BufferedWriter 当读取数据时

  • Java WebService技术详解

    目录 WebService WebService简介 WebService原理 JAVA WebService规范 (1)JAX-WS: (2)JAXM&SAAJ: (3)JAX-RS: WebService入门案例 服务端的实现 客户端的实现 WSDL 文档结构 阅读方式 SOAP SOAP结构 UDDI Webservice的客户端调用方式 一:生成客户端调用方式 二:service编程调用方式 三:HttpURLConnection调用方式 使用注解修改WSDL内容 WebService

  • 详解Android4.4 RIL短信接收流程分析

    最近有客户反馈Android接收不到短信,于是一头扎进RIL里面找原因.最后发现不是RIL的问题,而是BC72上报 短信的格式不对,AT+CNMA=1无作用等几个小问题导致的.尽管问题不在RIL,但总算把RIL短信接收流程搞清楚了. 接收到新信息的log: D/ATC ( 1269): AT< +CMT:,27 D/ATC ( 1268): AT< 0891683108705505F0040d91683117358313f500009101329154922307ea31da2c36a301

  • 详解React中的组件通信问题

    引入 本来我是没想过总结这些东西的,会感觉比较入门.但是之前同学去腾讯面试问到了这个问题(react或vue的组件通信),我帮他整理,顺便写demo的过程中,会有一些新的体会,多总结还是有利于进步的呀. 父子组件 父 → 子 parent组件传给child组件,符合react的单向数据流理念,自上到下传递props. // 父组件 class Parent extends Component { constructor() { super(); this.state = { value: '',

  • Android中dumpsys命令用法简单介绍

    在Android手机上, 通过使用adb shell命令可以进入android系统的shell, 该shell除支持一些常用的标准命令之外,还支持一些和android系统相关的其他命令, 这些命令可以打印出系统当前的状态信息. dumpsys就是这样一个命令. 使用 adb shell 进入命令行, 敲入dumpsys, 打印出的信息超级多, 在我的nexus 5机器上,输出多达67000行. 首先从打印信息的开始部分看起: Currently running services: Surface

  • 浅谈Android客户端与服务器的数据交互总结

    前言: 本文总结了Android客户端与服务器进行交互时,采用RESTful API +Json的交互方式,针对不同的数据形式以及不同的解析方法,如有不足之处,欢迎指正. 温馨提示:本文适合有一定Android开发经验的人阅读,如有疑问,欢迎留言讨论. 先了解一下相关的基本概念. 1. Android客户端与服务器端通信方式 通信方式主要有HTTP和Socket. HTTP通信:即使用HTTP协议进行通信,工作原理是客户端向服务器端发送一条HTTP请求,服务器收到之后先解析客户端的请求,之后会返

  • Springboot项目打war包docker包找不到resource下静态资源的解决方案

    前一段时间遇到一个问题,是关于读取项目中文件资源的问题.我是一个maven工程 我把一张照片放到resource下面,然后在本地读取的时候可以读取到,但是一旦打成WAR包以后就总是包找不到文件资源错误.我的war包是springboot打的war包,是内嵌的tomcat所以不解压,然后系统去找路径的时候会发现是个WAR包,而图片在WAR包内,所以找不到. 为了解决这个问题,我走了好多弯路,一直在路径上花费时间. 一开始使用修改配置文件的方式: # 配置静态资源访问前缀 spring.mvc.st

  • 详解Glide4.0集成及使用注意事项

    Glide 4.0由Google的各种团队内部使用,4.0被认为是内部稳定的.但外部用户可能会发现内部尚未发现的问题.因此,将此作为RC发布.如果没有发现稳定性或API中的重大问题,预计不久之后就会发布非RC版本. 一.集成 1.project gradle repositories { mavenLocal() } 2.app gradle compile 'com.android.support:support-v4:25.3.1' compile 'com.github.bumptech.

随机推荐