Flink自定义Sink端实现过程讲解

目录
  • Sink介绍
  • UML关系
  • Flink addSink
  • 案例

Sink介绍

在Fink官网中sink端只是给出了常规的write api.在我们实际开发场景中需要将flink处理的数据写入kafka,hbase kudu等外部系统。

UML关系

自定义Sink需要实现父类的接口和继承抽象类。

上面是Sink的继承关系

Flink addSink

// 方法需要SinkFunction的对象
public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
		// read the output type of the input Transform to coax out errors about MissingTypeInfo
		transformation.getOutputType();
		// configure the type if needed
		if (sinkFunction instanceof InputTypeConfigurable) {
			((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
		}
		StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));
		DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);
		getExecutionEnvironment().addOperator(sink.getTransformation());
		return sink;
	}

SinkFunction

// SinkFunction是一个接口
public interface SinkFunction<IN> extends Function, Serializable {
   //公共方法
	default void invoke(IN value, Context context) throws Exception {
		invoke(value);
	}
}

RichSinkFunction

@Public
public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> {
	private static final long serialVersionUID = 1L;
}

其他继承接口SinkFunction的类:

案例

自定义HbaseSink

public class HbaseSink extends RichSinkFunction<Tuple2<Integer, String>> {
    Logger logger = LoggerFactory.getLogger(HbaseSink.class);
    org.apache.hadoop.conf.Configuration configuration;
    Connection connection;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        //获取hbase 的链接信息
        configuration = HBaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum", "hadoop101,hadoop102,hadoop103");
        //创建conn
        connection = ConnectionFactory.createConnection(configuration);
        logger.info("创建链接成功");
    }
    @Override
    public void invoke(Tuple2<Integer, String> value, Context context) throws Exception {
        //往habse 里面插入数据
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Table table = connection.getTable(TableName.valueOf("torder_count"));
        Put put = new Put(value.f1.getBytes(StandardCharsets.UTF_8));
        put.addColumn("info".getBytes(), // 列族
                "order_total".getBytes(StandardCharsets.UTF_8), //特征字段
                value.f0.toString().getBytes()); //属性值
        put.addColumn("info".getBytes(), "insert_time".getBytes(), format.format(new Date(System.currentTimeMillis())).getBytes());
        table.put(put);
        table.close();
        logger.info("=====一条数据写入成功======,时间:"+value.f1+", 值:"+value.f0);
    }
    @Override
    public void close() throws Exception {
        super.close();
        connection.close();
    }

通过以上案例我们熟悉了addSink函数的操作。

到此这篇关于Flink自定义Sink端实现过程讲解的文章就介绍到这了,更多相关Flink自定义Sink内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java Flink窗口触发器Trigger的用法详解

    目录 定义 Trigger 源码 TriggerResult 源码 Flink 预置的Trigger EventTimeTrigger源码 ProcessingTimeTrigger源码 常见窗口的Trigger 滚动窗口 滑动窗口 会话窗口 全局窗口 定义 Trigger确定窗口(由窗口分配器形成)何时准备好由窗口函数处理.每个WindowAssigner都带有一个默认值Trigger.如果默认触发器不符合您的需求,您可以使用trigger(…). Trigger 源码 public abst

  • Java Flink与kafka实现实时告警功能过程

    目录 引出问题 demo设计 环境搭建 flink程序代码 项目演示 告警系统架构 引出问题 项目使用告警系统的逻辑是将实时数据保存到本地数据库再使用定时任务做判断,然后产生告警数据.这种方式存在告警的延时实在是太高了.数据从产生到保存,从保存到判断都会存在时间差,按照保存数据定时5分钟一次,定时任务5分钟一次.最高会产生10分钟的误差,这种告警就没什么意义了. demo设计 为了简单的还原业务场景,做了简单的demo假设 实现一个对于学生成绩评价的实时处理程序 数学成绩,基准范围是90-140

  • Flink自定义Sink端实现过程讲解

    目录 Sink介绍 UML关系 Flink addSink 案例 Sink介绍 在Fink官网中sink端只是给出了常规的write api.在我们实际开发场景中需要将flink处理的数据写入kafka,hbase kudu等外部系统. UML关系 自定义Sink需要实现父类的接口和继承抽象类. 上面是Sink的继承关系 Flink addSink // 方法需要SinkFunction的对象 public DataStreamSink<T> addSink(SinkFunction<T

  • Python自定义元类的实例讲解

    1.说明 一个类没有声明自己的元类,默认他的元类就是type,除了使用元类type,用户也可以通过继承type来自定义元类. 2.实例 我们可以使用类属性 __metaclass__ 把一个类的创建过程,转交给其它地方. class A(object): __metaclass__ = ... # 这个类的创建转交给其他地方 pass 先定义了类 A,然后定义了一个类属性 __metaclass__,这个属性表示创建类 A 的过程,转交给其它地方处理. 类属性 __metaclass__ 可以是

  • SpringBoot整合JPA框架实现过程讲解

    目录 一. Spring Boot数据访问概述 二. Spring Data JPA简介 2.1 编写ORM实体类 2.2 编写Repository接口 2.2.1 继承XXRepository<T, ID>接口 2.2.2 操作数据的多种方式 2.2.3 @Transactional事务管理 2.2.4 @Moditying注解 2.3.5 复杂条件查询 三. 使用Spring Boot整合JPA 3.1 添加Spring Data JPA依赖启动器 3.2 编写ORM实体类 3.3 编写R

  • SpringCloud Gateway动态转发后端服务实现过程讲解

    目录 前言 一.概述 二.项目中加入依赖 三.配置文件 四.动态路由数据存储格式 五.后端服务动态转发 六.单元测试 前言 API网关的核心功能是统一流量入口,实现路由转发,SpringCloudGateway是API网关开发的技术之一,此外比较流行的还有Kong和ApiSix,这2个都是基于OpenResty技术栈. 简单的路由转发可以通过SpringCloudGateway的配置文件实现,在一些业务场景种,会需要动态替换路由配置中的后端服务地址,单纯靠配置文件无法满足这种需求. 本文介绍一种

  • Spring MVC中自定义拦截器的实例讲解

    1. 引言 拦截器(Interceptor)实现对每一个请求处理前后进行相关的业务处理,类似于Servlet的Filter. 我们可以让普通的Bean实现HandlerIntercpetor接口或继承HandlerInterceptorAdapter类来实现自定义拦截器. 通过重写WebMvcConfigurerAdapter的addIntercetors方法来注册一个计算每一次请求的处理时间的拦截器. 2. 自定义拦截器的实现 2.1 定义拦截器 新建LogInterceptor类,并继承Ha

  • vue自定义移动端touch事件之点击、滑动、长按事件

    用法: **HTML** <div id="app" class="box" v-tap="vuetouch" //vuetouch为函数名,如没有参数,可直接写函数名 v-longtap="{fn:vuetouch,name:'长按'}" //如果有参数以对象形式传,fn 为函数名 v-swipeleft="{fn:vuetouch,name:'左滑'}" v-swiperight="{f

  • Java自定义实现equals()方法过程解析

    这篇文章主要介绍了Java自定义实现equals()方法过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 以常见的自定义Date类型为例,没有经验的朋友可能会觉得直接比较年月日即可,从而写出以下的实现 public class MyDate implements Comparable<MyDate> { private final int year; private final int month; private final int

  • React工作流程及Error Boundaries实现过程讲解

    目录 什么是Error Boundaries 步骤1:捕获错误 步骤2:构造callback 执行callback 总结 这里简单讲解下React工作流程,后文有用.分为三步: 触发更新 render阶段:计算更新会造成的副作用 commit阶段:在宿主环境执行副作用 副作用有很多,比如: 插入DOM节点 执行useEffect回调 好了,让我们进入主题. 什么是Error Boundaries React提供了两个与错误处理相关的API: getDerivedStateFromError:静态

  • MySQL 数据持久化过程讲解

    目录 1. 过程简述 2. 内存中的操作 3. 磁盘的持久化 3.1 事务日志的作用 3.2 表结构的两步存储 1. 过程简述 理解MySQL数据的持久化过程,能很好的帮助我们加深对于MySQL底层的理解,在本文,我们以一种通俗的方式梳理一下这个过程,帮助大家建立起初步的认识,如果大家感兴趣,可以去深入学习与研究这个过程. MySQL数据的存储总体上可以分为两部分,内存中的存储过程以及硬盘的持久化存储,这里,就涉及到了内存中buffer poll和redo log以及磁盘上的事务日志和表结构,在

  • Spring Security自定义登录页面认证过程常用配置

    目录 一.自定义登录页面 1.编写登录页面 2.修改配置类 3.编写控制器 二. 认证过程其他常用配置 1.失败跳转 1.1编写页面 1.2修改表单配置 1.3添加控制器方法 1.4设置fail.html不需要认证 2.设置请求账户和密码的参数名 2.1源码简介 2.2修改配置 2.3修改页面 3.自定义登录成功处理器 3.1源码分析 3.2代码实现 4.自定义登录失败处理器 4.1源码分析 4.2代码实现 一.自定义登录页面 虽然Spring Security给我们提供了登录页面,但是对于实际

随机推荐