详解Flink同步Kafka数据到ClickHouse分布式表

目录
  • 引言
  • 什么是ClickHouse?
  • 创建复制表
  • 通过jdbc写入

引言

业务需要一种OLAP引擎,可以做到实时写入存储和查询计算功能,提供高效、稳健的实时数据服务,最终决定ClickHouse

什么是ClickHouse?

ClickHouse是一个用于联机分析(OLAP)的列式数据库管理系统(DBMS)。

列式数据库更适合于OLAP场景(对于大多数查询而言,处理速度至少提高了100倍),下面详细解释了原因(通过图片更有利于直观理解),图片来源于ClickHouse中文官方文档。

行式

列式

我们使用Flink编写程序,消费kafka里面的主题数据,清洗、归一,写入到clickhouse里面去。

这里的关键点,由于第一次使用,无法分清应该建立什么格式的clickhouse表,出现了一些问题,最大的问题就是程序将数据写入了,查询发现数据不完整,只有一部分。我也在网上查了一些原因,总结下来。

为什么有时看不到已经创建好的表并且查询结果一直抖动时多时少?

常见原因1:

建表流程存在问题。ClickHouse的分布式集群搭建并没有原生的分布式DDL语义。如果您在自建ClickHouse集群时使用create table创建表,查询虽然返回了成功,但实际这个表只在当前连接的Server上创建了。下次连接重置换一个Server,您就看不到这个表了。

解决方案:

建表时,请使用create table <table_name> on cluster default语句,on cluster default声明会把这条语句广播给default集群的所有节点进行执行。示例代码如下。 Create table test on cluster default (a UInt64) Engine = MergeTree() order by tuple(); 在test表上再创建一个分布式表引擎,建表语句如下。 Create table test_dis on cluster default as test Engine = Distributed(default, default, test, cityHash64(a));

常见原因2:

ReplicatedMergeTree存储表配置有问题。ReplicatedMergeTree表引擎是对应MergeTree表引擎的主备同步增强版,在单副本实例上限定只能创建MergeTree表引擎,在双副本实例上只能创建ReplicatedMergeTree表引擎。

解决方案:

在双副本实例上建表时,请使用ReplicatedMergeTree(‘/clickhouse/tables/{database}/{table}/{shard}’, ‘{replica}’)或ReplicatedMergeTree()配置ReplicatedMergeTree表引擎。其中,ReplicatedMergeTree(‘/clickhouse/tables/{database}/{table}/{shard}’, ‘{replica}’)为固定配置,无需修改。

这里引出了复制表的概念,这里介绍一下,只有 MergeTree 系列里的表可支持副本:

ReplicatedMergeTree

ReplicatedSummingMergeTree

ReplicatedReplacingMergeTree

ReplicatedAggregatingMergeTree ReplicatedCollapsingMergeTree

ReplicatedVersionedCollapsingMergeTree

ReplicatedGraphiteMergeTree

副本是表级别的,不是整个服务器级的。所以,服务器里可以同时有复制表和非复制表。副本不依赖分片。每个分片有它自己的独立副本。

创建复制表

先做好准备工作,该建表的建表,然后编写程序。在表引擎名称上加上 Replicated 前缀。例如:ReplicatedMergeTree。

  • 首先创建一个分布式数据库
create database test on cluster default_cluster;
  • 创建本地表

由于clickhouse是分布式的,创建本地表本来应该在每个节点上创建的,但是指定on cluster关键字可以直接完成,建表语句如下:

CREATE TABLE test.test_data_shade on cluster default_cluster
(
    `data` Map(String, String),
    `uid` String,
    `remote_addr` String,
    `time` Datetime64,
    `status` Int32,
    ...其它字段省略
    `dt` String
)
ENGINE = ReplicatedMergeTree()
partition by dt
order by (dt, sipHash64(uid));

这里表引擎为ReplicatedMergeTree,即有副本的表,根据dt按天分区,提升查询效率,sipHash64是一个hash函数,根据uid散列使得相同uid数据在同一个分片上面,如果有去重需求,速度更快,因为可以计算每个分片去重,再汇总一下即可。

  • 创建分布式表
CREATE TABLE test.test_data_all on cluster default_cluster as test.test_data_shade ENGINE = Distributed('default_cluster', 'test', 'test_data_shade', sipHash64(uid));

在多副本分布式 ClickHouse 集群中,通常需要使用 Distributed 表写入或读取数据,Distributed 表引擎自身不存储任何数据,它能够作为分布式表的一层透明代理,在集群内部自动开展数据的写入、分发、查询、路由等工作。

通过jdbc写入

这个我是看的官方文档,里面有2种选择,感兴趣的同学可以都去尝试一下。

这里贴一下我的Pom依赖

<dependency>
    <groupId>ru.yandex.clickhouse</groupId>
    <artifactId>clickhouse-jdbc</artifactId>
    <version>0.3.1-patch</version>
    <classifier>shaded</classifier>
    <exclusions>
        <exclusion>
            <groupId>*</groupId>
            <artifactId>*</artifactId>
        </exclusion>
    </exclusions>
</dependency>

Flink主程序,消费kafka,做清洗,然后写入clickhouse,这都是常规操作,这里贴一下关键代码吧。

连接clickhouse有2种方式,8123端口的http方式,和基于9000端口的tcp方式。

这里官方推荐的是连接驱动是0.3.2:

<dependency>
    <!-- please stop using ru.yandex.clickhouse as it's been deprecated -->
    <groupId>com.clickhouse</groupId>
    <artifactId>clickhouse-jdbc</artifactId>
    <version>0.3.2-patch11</version>
    <classifier>all</classifier>
    <exclusions>
        <exclusion>
            <groupId>*</groupId>
            <artifactId>*</artifactId>
        </exclusion>
    </exclusions>
</dependency>

Note: ru.yandex.clickhouse.ClickHouseDriver has been deprecated and everything under ru.yandex.clickhouse will be removed in 0.3.3.

官方推荐升级到0.3.2,上面表格给出了升级方法,文档地址:

github.com/ClickHouse/…

以上就是详解Flink同步Kafka数据到ClickHouse分布式表的详细内容,更多关于Flink数据同步Kafka ClickHouse的资料请关注我们其它相关文章!

(0)

相关推荐

  • 一文详解kafka序列化器和拦截器

    目录 介绍 序列化器 设置序列化和反序列化 自定义序列化 自定义反序列化 思考 拦截器 总结 介绍 本篇主要介绍kafka的拦截器和序列化器,序列化器是和数据在网络中的传输有关,数据在网络中的传输为字节流,所以生产者在发送时需要将其序列化为字节流,消费者收到消息时,需要将字节流反序列化为我们能够识别的对象,我们不难看出,这就是RPC通信,kafka中实现了很多自定义协议,我们知道,在RPC通信中,只有生产者和消费者的协议一样,才能相互传输和解析数据,在使用HTTP时,我们就不用去关注协议本身,因

  • 流式图表拒绝增删改查之框架搭建过程

    目录 前言 技术方案 数据库设计 基础模块 整体流程 最终效果 前言 作为一名练习时长两年半的夹娃工程师,常年浸泡在增删改查的业务代码里,每当金三银四来临该迭代自己简历的时候,面对自己的项目经历都十分窘迫.突然有天学弟问我在实习公司一直做缝缝补补的工作或者是一些基于封装好的RBAC(基于角色的权限管理系统)做业务的增删改查,眼下又要找工作了不知道咋写项目经验. 无论是功能.技术栈.设计都平淡无奇,问我咋整,我打开尘封已久的简历一看,操,我不也一样吗. 当时快毕业那会也是有点焦虑,难的项目看不懂简

  • go操作Kafka使用示例详解

    目录 1. Kafka介绍 1.1 Kafka是什么 1.2 Kafka的特点 1.3 常用的场景 1.4 Kafka中包含以下基础概念 1.5 消息 1.6 消息格式 2. Kafka深层介绍 2.1 架构介绍 2.2 ⼯作流程 2.3 选择partition的原则 2.4 ACK应答机制 2.5 Topic和数据⽇志 2.6 Partition结构 2.7 消费数据 3. 操作Kafka 3.1 sarama 3.2 下载及安装 3.3 连接kafka发送消息 3.4 连接kafka消费消息

  • 详解Mysql如何实现数据同步到Elasticsearch

    目录 一.同步原理 二.logstash-input-jdbc 三.go-mysql-elasticsearch 四.elasticsearch-jdbc 五.logstash-input-jdbc实现同步 六.go-mysql-elasticsearch实现同步 七.elasticsearch-jdbc实现同步 一.同步原理 基于Mysql的binlog日志订阅:binlog日志是Mysql用来记录数据实时的变化 Mysql数据同步到ES中分为两种,分别是全量同步和增量同步 全量同步表示第一次

  • 详解数据库中跨库数据表的运算

    1. 简单合并(FROM) 所谓跨库数据表,是指逻辑上同一张数据表被分别存储在不同数据库中.其原因有可能是因为数据量太大,放在一个数据库难以处理,也可能在业务上就需要将生产库和历史库分开.而不同的数据库,可能只是部署在不同的机器上的同种数据库,也可能是连类型都不同的数据库系统. 在面对跨库数据表,特别是数据库类型都不相同的情况时,数据库自带的工具往往就力所不及了,一般都需要寻找能够很好地支持多数据源类型的第三方工具,而集算器,可以说是其中的佼佼者了.下面,我们就针对几种常见的跨库混合运算情况详细

  • 微信小程序 详解Page中data数据操作和函数调用

    微信小程序 详解Page中data数据操作和函数调用 Page() 函数用来注册一个页面.接受一个 object 参数,其指定页面的初始数据.生命周期函数.事件处理函数等. //index.js <pre code_snippet_id="2049407" snippet_file_name="blog_20161214_1_1145312" name="code" class="javascript">Page(

  • 实例详解Android文件存储数据方式

    总体的来讲,数据存储方式有三种:一个是文件,一个是数据库,另一个则是网络.下面通过本文给大家介绍Android文件存储数据方式. 1.文件存储数据使用了Java中的IO操作来进行文件的保存和读取,只不过Android在Context类中封装好了输入流和输出流的获取方法. 创建的存储文件保存在/data/data/<package name>/files文件夹下. 2.操作. 保存文件内容:通过Context.openFileOutput获取输出流,参数分别为文件名和存储模式. 读取文件内容:通

  • 详解Django中views数据查询使用locals()函数进行优化

    优化场景 利用视图函数(views)查询数据之后可以通过上下文context.字典.列表等方式将数据传递给HTML模板,由template引擎接收数据并完成解析.但是通过context传递数据可能就存在在不同的视图函数中使用重复的查询语句,所以可以通过将重复查询语句设置全局变量,配合locals()函数进行数据查询与传递. 优化前 def index(request): threatname = '威胁情报展示' url = 'www.testtip.com' allthreat = Threa

  • 详解MySQL中的数据类型和schema优化

    最近在学习MySQL优化方面的知识.本文就数据类型和schema方面的优化进行介绍. 1. 选择优化的数据类型 MySQL支持的数据类型有很多,而如何选择出正确的数据类型,对于性能是至关重要的.以下几个原则能够帮助确定数据类型: 更小的通常更好 应尽可能使用可以正确存储数据的最小数据类型,够用就好.这样将占用更少的磁盘.内存和缓存,而在处理时也会耗时更少. 简单就好 当两种数据类型都能胜任一个字段的存储工作时,选择简单的那一方,往往是最好的选择.例如整型和字符串,由于整型的操作代价要小于字符,所

  • 详解实现vue的数据响应式原理

    这篇文章主要是给不了解或者没接触过 vue 响应式源码的小伙伴们看的,其主要目的在于能对 vue 的响应式原理有个基本的认识和了解,如果在面试中被问到此类问题,能够知道面试官想让你回答的是什么?「PS:文中如有不对的地方,欢迎小伙伴们指正」 响应式的理解 响应式顾名思义就是数据变化,会引起视图的更新.这篇文章主要分析 vue2.0 中对象和数组响应式原理的实现,依赖收集和视图更新我们留在下一篇文章分析. 在 vue 中,我们所说的响应式数据,一般指的是数组类型和对象类型的数据.vue 内部通过

  • 详解Java中JSON数据的生成与解析

    一.什么是JSON JSON: JavaScript Object Notation JS对象简谱,是一种类似于XML的语言.相比于XML,它更小.更快.更易解析.主要用于项目前端和Server的网络数据传输. 二.JSON的语法 对象 一个对象,由一个大括号表示{},{}中通过一个个的键值对来描述对象的属性 注意: 键与值之间使用冒号连接,多个键值对之间使用逗号分隔. 键值对的键,应使用引号引住(通常Java解析时,键不使用引号会报错,而JS能正确解析):键值对的值,可以是JS中的任意数据类型

  • 详解python读取matlab数据(.mat文件)

    我们都知道,matlab是一个非常好用的矩阵计算分析软件,然额,matlab自带的绘图效果极为锯齿,所以,这里分享一个在python中读取matlab处理后的数据.mat文件. 1.首先,我们这里先打开matlab,随便在命令行窗输入两个变量, matlab_x=1:0.01:10; matlab_y=sin(matlab_x); 2.计算处理后,matlab右边的工作区会有两个变量值,分别为matlab_y.matlab_x 3.然后,我们将鼠标放置在工作区空白位置右键,选择保存,也可以在工作

  • 实例详解esp8266解析json数据的方法

    #include <ArduinoJson.h> void setup() { Serial.begin(115200); Serial.println("这里用于测试json数据的解析"); // DynamicJsonDocument对象 定义时候我们需要定义一个大小信息+复制大小 const size_t capacity = JSON_OBJECT_SIZE(2) + 30; DynamicJsonDocument doc(capacity); // 要解析的jso

随机推荐