Flink状态和容错源码解析

目录
  • 引言
  • 概述
  • State
    • Keyed State
      • 状态实例管理及数据存储
      • HeapKeyedStateBackend
      • RocksDBKeyedStateBackend
    • OperatorState
    • 上层封装
  • 总结

引言

  • 计算模型

    • DataStream基础框架
    • 事件时间和窗口
    • 状态和容错
  • 部署&调度
  • 存储体系
  • 底层支撑

Flink中提供了State(状态)这个概念来保存中间计算结果和缓存数据,按照不同的场景,Flink提供了多种不同类型的State,同时为了实现Exactly once的语义,Flink参考Chandy-Lamport算法实现了Asynchronous Barrier Snapshotting算法(简称ABS),本篇我们来了解Flink状态的底层实现及如何进行快照处理。

概述

Flink中提供了2种State,一种是Keyed State,是用在Keyed DataStream(即每条记录是有一个key)。即每个key对应有一个状态信息,用于数据处理场景中需要按key记录状态的场景,如风控场景下记录用户的状态,window操作中记录窗口的数据信息也是用keyed State来实现的。另一种是Operator State,即算子的状态,如在KafkaSource中记录消费的偏移量。本篇将从状态的管理、数据的存储和备份恢复等来介绍底层机制。

State

Keyed State

Keyed State按照存储数据类型的不同,分为如下几类

  • ValueState: 保存一个值,支持查询和更新
  • ListState: 保存一个元素列表
  • ReducingState: 保存一个单值,表示添加到状态的所有数据的聚合,有定义一个ReduceFunction来进行聚合处理
  • AggregatingState<IN, OUT>: 保留一个单值,也是添加到状态的所有数据的聚合,和ReducingState不同的是,输入数据和结果值的类型可以不同。
  • MapState: 维护的一个映射列表 创建这些状态时需要同时定义一个StateDescriptor,这里面定义一个状态的名字(唯一名称),通过StateDescriptor就可以来引用具体状态实例。

状态实例管理及数据存储

从Keyed State的定义来看,这里的关系应该是一个key到State的映射,而在使用时却没有看到这个key,另外具体的状态是如何保存的,本节我们来深入分析

从上图可以看出,各种不同的State,是通过KeyedStateStore来获取到的,KeyedStateStore只是一个代理类,其底层是调用KeyedStateBackend来负责具体的处理,具体的实现类有如下2个

  • HeapKeyedStateBackend: 基于内存的StateBackend,把数据保存在Java的Heap里面
  • RocksDBKeyedStateBackend: 状态数据保存在RocksDB中

下面我们从如下几个功能点来看具体各个StateBackend的实现

  • 各个实例的管理
  • 内部数据保存
  • 数据过期处理
  • 数据快照处理
  • 数据重分布

HeapKeyedStateBackend

基于内存的StateBackend,将数据保存在Java的Heap中,适合小数据状态场景

各个实例的管理

private final Map<String, StateTable<K, ?, ?>> registeredKVStates;

使用一个Map来保存各个不同的KeyedState,key为定义的名字,StateTable中存储的具体的数据。StateTable的内部在下面内部数据保存中介绍

  • 内部数据保存 内部数据存储是通过StateTable来管理的,里面定义了一个StateMap的数组来存储具体的数据,具体的数据通过计算key的KeyGroupIndex来确定把数据存放到哪个StateMap。

    • StateMap:存储数据,具体的实现有CopyOnWriteStateMap和CopyOnWriteSkipListStateMap2个。
    • KeyGroup:将key分组管理,这样方便在后续通过savepoint来恢复任务时如果调整了并行度,这样方便对key按KeyGroup重新分布。keyGroup的计算为将key做hash处理后按最大并行度(maxParallelism)取余
  • 数据过期处理 如果有设置了数据过期处理,这种生成的State会每个value都带上一个时间戳数据,如MapState是每个value都带一个时间戳,具体的实例类型也会不同,对应为如下几个
    • TtlValueState
    • TtlListState
    • TtlReducingState
    • TtlAggregatingState
    • TtlMapState 具体数据的清理处理通过TtlIncrementalCleanup类来实现
  • 数据快照处理 HeapKeyedStateBackend的snapshot处理由类HeapSnapshotStrategy来负责,调用的方法为asyncSnapshot(),只支持全量的快照处理。
  • 数据重分布 在实际的数据处理中由于并行度设置的不合理,在日常运维过程中会涉及到并行度的调整,算子的并行度调整后,那对key的分布也会进行调整,这样就会导致keyedState的数据进行重分布。Flink中引入了一个KeyGroup的概念,其对key做了个分组管理,KeyGroup的个数为最大并行度,具体实现为将key进行hash后然分类(hash值对KeyGroup数取余)到其中一个KeyGroup中。

RocksDBKeyedStateBackend

使用RocksDB来存储状态,这个State backend可以存储非常大的状态,如果超过了内存可以split到磁盘上。同样我们分如下几个阶段来了解相关具体的实现

  • 各个实例的管理 通过如下的Map来存储定义的各个状态信息,和前面HeapKeyedStateBackend类似,key为自己定义的名称
LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation;
  • 内部数据保存 从上面可以看到每个key对应的value是RocksDbKvStateInfo,这个其中有如下2个属性 ColumnFamilyHandle:rocksdb库中的类,提供了一个handle对应到rocksdb的ColumnFamily。
public static class RocksDbKvStateInfo implements AutoCloseable {
        public final ColumnFamilyHandle columnFamilyHandle;
        public final RegisteredStateMetaInfoBase metaInfo;
}
  • 数据过期处理 通过RocksDB的CompactionFilter[1]功能来实现数据过期的处理,RocksDB项目中专门有个FlinkCompactionFilter的类用于Flink项目 CompactionFilter提供了一种在rocksdb进行compaction时候,根据自定义逻辑去删除/修改key/value对的方法。如根据业务的ttl属性删除过期keys。
  • 数据快照处理 RocksDBKeyedStateBackend支持2种快照处理方式,全量和增量,由于RocksDB底层是使用lsm树来进行存储的,所以比较方便实现增量数据的获取
  • 数据恢复处理 和前面HeapKeyedStateBackend的类似,这里就不再展开了。

OperatorState

接下面我们深入了解下OperatorState,在实际使用中如我们消费了kafka的数据需要记录kafka消费的offset,还有一些场景需要将一些信息分发到所有的任务。这里需要使用到一类新的状态处理,这种状态是与每个算子绑定的,Flink提供了如下3个类来支持

  • ListState:通过一个list来保存相关的状态信息,如果并行度调整了,其中的数据按新的并行度重新进行分布处理;
  • UnionListState: 保存数据时和ListState类似,但是在出错和从savepoint恢复数据时的策略会不一样,其会将所有的状态通过广播的方式发给下游的每个任务,然后下游的任务来选择自己需要的部分;
  • BroadcastState: 用于每个任务上的状态都要一样的场景,这个在数据恢复时是把状态复制到所有的任务。

类似KeyedState,这里也是通过一个Backend来管理对应的状态数据,其接口定义为:OperatorStateStore。其实现类目前只有一个:DefaultOperatorStateBackend,我们也通过以下几个部分来分别了解DefaultOperatorStateBackend的实现

  • 各个实例的管理
private final Map<String, PartitionableListState<?>> registeredOperatorStates;
private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;

这里通过2个Map来分别管理ListState(含UnionListState,后面都统一使用ListState来代指)和BroadcastState。 2. 内部数据保存 ListState的实现类为PartitionableListState,底层通过一个ArrayList来保存数据。BroadcastState定义的接口是kv的,所以其实现类HeapBroadcastState使用Map来存储相应的数据。 3. 数据过期处理 operatorState不涉及到数据过期的处理 4. 数据快照处理 DefaultOperatorStateBackendSnapshotStrategy类来负责具体的快照处理,调用的方法为asyncSnapshot,分别对ListState和BroadcastState进行快照处理 5. 数据重分布 数据重分布的策略前面介绍各个State时已经介绍了,这里就不再重复介绍了。

上层封装

Flink中对状态的backend和checkpoint存储策略进行了封装定义。 StateBackend:定义一个Streaming应用的state如何在集群中的本地存储。不同的实现使用不同的数据结构来存储对应的状态。其具体实现有如下2个:

  • HashMapStateBackend:将状态保存在TaskManager的内存中(JVM heap),其底层KeyedStateBackend使用的是HeapKeyedStateBackend,OperatorStateBackend使用的是DefaultOperatorStateBackend
  • EmbeddedRocksDBStateBackend:状态保存在一个嵌入的Rockdb实例中,其底层实现KeyedStateBackend使用的是RocksDBKeyedStateBackend,OperatorStateBackend使用的和HashMapStateBackend一致,也是DefaultOperatorStateBackend 说明:原来的RocksDBStateBackend、FsStateBackend和MemoryStateBackend不建议使用了。 另外对checkpoint的存储,CheckpointStorage接口定义了在Streaming应用中StateBackend在容错性方面如何存储其状态数据。其实现有如下2个
  • JobManagerCheckpointStorage:将checkpoints状态数据存储到JobManager的内存中,savepoints保存到文件系统
  • FileSystemCheckpointStorage:将checkpoints状态存储到文件系统,如保存到HDFS上。

总结

本篇我们了解了Flink的相关的状态的内容和checkpoint的保存策略。

  • State分为KeyedState和OperatorState,KeyedState主要存储针对记录级别的状态,如window操作时的状态。OperatorState主要存储针对算子的状态,如消费kafka的offset信息等;每类状态分别提供了不同种类的状态类来支持不同场景的状态保存需求
  • Flink底层通过StateBackend来保存对应的状态数据,主要有通过内存保存和使用RocksDB保存这2类,另外在其具体实现中为了方便并行度调整后对状态的重新拆分处理,引入了KeyGroup的概念
  • 在用户使用层,对上述2种状态进行了封装,接口为:StateBackend,来管理KeyedState和OperatorState,另外将checkpoint的存储策略从原来的StateBackend中拆分出来。目前支持有基于内存和外部存储的2类checkpoint存储策略。 最后由于checkpoint的触发以及任务恢复的处理与整体计算处理比较紧密,这块等介绍完Flink部署模式后再来详细梳理checkpoint的处理过程。

参考文档

以上就是Flink状态和容错源码解析的详细内容,更多关于Flink状态容错的资料请关注我们其它相关文章!

(0)

相关推荐

  • 详解Flink同步Kafka数据到ClickHouse分布式表

    目录 引言 什么是ClickHouse? 创建复制表 通过jdbc写入 引言 业务需要一种OLAP引擎,可以做到实时写入存储和查询计算功能,提供高效.稳健的实时数据服务,最终决定ClickHouse 什么是ClickHouse? ClickHouse是一个用于联机分析(OLAP)的列式数据库管理系统(DBMS). 列式数据库更适合于OLAP场景(对于大多数查询而言,处理速度至少提高了100倍),下面详细解释了原因(通过图片更有利于直观理解),图片来源于ClickHouse中文官方文档. 行式 列

  • Flink JobGraph生成源码解析

    目录 引言 概念 JobGraph生成 生成hash值 生成chain 总结 引言 在DataStream基础中,由于其中的内容较多,只是介绍了JobGraph的结果,而没有涉及到StreamGraph到JobGraph的转换过程.本篇我们来介绍下JobGraph的生成的详情,重点是Operator可以串联成Chain的条件 概念 首先我们来回顾下JobGraph中的相关概念 JobVertex:job的顶点,即对应的计算逻辑(这里用的是Vertex, 而前面用的是Node,有点差异),通过in

  • Flink部署集群整体架构源码分析

    目录 概览 部署模式 Application mode 客户端提交请求 服务端启动&提交Application session mode Cluster架构 Cluster的启动流程 DispatcherResourceManagerComponent Runner代码 HA代码框架 总结 概览 本篇我们来了解Flink的部署模式和Flink集群的整体架构 部署模式 Flink支持如下三种运行模式 运行模式 描述 Application Mode Flink Cluster只执行提交的整个job

  • Flink DataStream基础框架源码分析

    目录 引言 概览 深入DataStream DataStream 属性和方法 类体系 Transformation 属性和方法 类体系 StreamOperator 属性和方法 类体系 Function DataStream生成提交执行的Graph StreamGraph 属性和方法 StreamGraph生成 JobGraph 属性和方法 总结 引言 希望通过对Flink底层源码的学习来更深入了解Flink的相关实现逻辑.这里新开一个Flink源码解析的系列来深入介绍底层源码逻辑.说明:这里默

  • Apache Hudi结合Flink的亿级数据入湖实践解析

    目录 1. 实时数据落地需求演进 2. 基于Spark+Hudi的实时数据落地应用实践 3. 基于Flink自定义实时数据落地实践 4. 基于Flink + Hudi的落地数据实践 5. 后续应用规划及展望 5.1 取代离线报表,提高报表实时性及稳定性 5.2 完善监控体系,提升落数据任务稳定性 5.3 落数据中间过程可视化探索 本次分享分为5个部分介绍Apache Hudi的应用与实践 1. 实时数据落地需求演进 实时平台上线后,主要需求是开发实时报表,即抽取各类数据源做实时etl后,吐出实时

  • Flink作业Task运行源码解析

    目录 引言 概览 调度框架 JobMaster ScheduleNG TaskExecutor Task 计算框架 算子计算处理 总结 引言 上一篇我们分析了Flink部署集群的过程和作业提交的方式,本篇我们来分析下,具体作业是如何被调度和计算的.具体分为2个部分来介绍 作业运行的整体框架,对相关的重要角色有深入了解 计算流程,重点是如何调度具体的operator机制 概览 首先我们来了解下整体的框架 JobMaster: 计算框架的主节点,负责运行单个JobGraph,包括任务的调度,资源申请

  • Flink时间和窗口逻辑处理源码分析

    目录 概览 时间 重要类 WatermarkStrategy WatermarkGenerator TimerService 处理逻辑 窗口 重要类 Window WindowAssigner Triger Evictor WindowOperator InternalAppendingState 处理逻辑 总结 概览 计算模型 DataStream基础框架 事件时间和窗口 部署&调度 存储体系 底层支撑 在实时计算处理时,需要跟时间来打交道,如实时风控场景的时间行为序列,实时分析场景下的时间窗

  • Flink状态和容错源码解析

    目录 引言 概述 State Keyed State 状态实例管理及数据存储 HeapKeyedStateBackend RocksDBKeyedStateBackend OperatorState 上层封装 总结 引言 计算模型 DataStream基础框架 事件时间和窗口 状态和容错 部署&调度 存储体系 底层支撑 Flink中提供了State(状态)这个概念来保存中间计算结果和缓存数据,按照不同的场景,Flink提供了多种不同类型的State,同时为了实现Exactly once的语义,F

  • Eureka源码解析服务离线状态变更

    目录 环境 1. 服务离线的方式 1.1 基于Actuator监控器实现 1.2 直接向Eureka Server提交请求 1.3 特殊状态CANCEL_OVERRIDE 2. 服务下架源码 2.1 cancelScheduledTasks() 2.2 unregister() 3. 服务下线源码分析(状态变更) 3.1 变更状态 3.2 获取状态 环境 eureka版本:1.10.11 Spring Cloud : 2020.0.2 Spring Boot :2.4.4测试代码:github.

  • 详解vue mint-ui源码解析之loadmore组件

    本文介绍了vue mint-ui源码解析之loadmore组件,分享给大家,具体如下: 接入 官方接入文档mint-ui loadmore文档 接入使用Example html <div id="app"> <mt-loadmore :top-method="loadTop" :bottom-method="loadBottom" :bottom-all-loaded="allLoaded" :max-dis

  • Redis源码解析:集群手动故障转移、从节点迁移详解

    一:手动故障转移 Redis集群支持手动故障转移.也就是向从节点发送"CLUSTER  FAILOVER"命令,使其在主节点未下线的情况下,发起故障转移流程,升级为新的主节点,而原来的主节点降级为从节点. 为了不丢失数据,向从节点发送"CLUSTER  FAILOVER"命令后,流程如下: a:从节点收到命令后,向主节点发送CLUSTERMSG_TYPE_MFSTART包:          b:主节点收到该包后,会将其所有客户端置于阻塞状态,也就是在10s的时间内

  • BootStrap Tooltip插件源码解析

    Tooltip插件可以让你把要显示的内容以弹出框的形式来展示,如: 因为自己在工作的过程中,用到了Tooltip这个插件,并且当时正想学习一下元素定位的问题,如:提示框显示的位置就是触发提示框元素的位置,可以配置在上.下.左.右等位置,所以就去看了源码.对于整个插件源码没有看全,但也学到了许多的知识点.能力有限,可能其中有认识错误的地方,以后再补充吧 1 使用方法不介绍 ,可以参照 Bootstrap 提示工具(Tooltip)插件 2 源码解析 +function ($) { 'use str

  • 浅谈Vuejs中nextTick()异步更新队列源码解析

    vue官网关于此解释说明如下: vue2.0里面的深入响应式原理的异步更新队列 官网说明如下: 只要观察到数据变化,Vue 将开启一个队列,并缓冲在同一事件循环中发生的所有数据改变.如果同一个 watcher 被多次触发,只会一次推入到队列中.这种在缓冲时去除重复数据对于避免不必要的计算和 DOM 操作上非常重要.然后,在下一个的事件循环"tick"中,Vue 刷新队列并执行实际(已去重的)工作.Vue 在内部尝试对异步队列使用原生的 Promise.then 和 MutationOb

  • Android源码解析之截屏事件流程

    今天这篇文章我们主要讲一下Android系统中的截屏事件处理流程.用过android系统手机的同学应该都知道,一般的android手机按下音量减少键和电源按键就会触发截屏事件(国内定制机做个修改的这里就不做考虑了).那么这里的截屏事件是如何触发的呢?触发之后android系统是如何实现截屏操作的呢?带着这两个问题,开始我们的源码阅读流程. 我们知道这里的截屏事件是通过我们的按键操作触发的,所以这里就需要我们从android系统的按键触发模块开始看起,由于我们在不同的App页面,操作音量减少键和电

随机推荐