MLSQL Stack如何让流调试更加简单详解

前言

有一位同学正在调研MLSQL Stack对流的支持。然后说了流调试其实挺困难的。经过实践,希望实现如下三点:

  • 能随时查看最新固定条数的Kafka数据
  • 调试结果(sink)能打印在web控制台
  • 流程序能自动推测json schema(现在spark是不行的)

实现这三个点之后,我发现调试确实就变得简单很多了。

流程

首先我新建了一个kaf_write.mlsql,里面方便我往Kafka里写数据:

set abc='''
{ "x": 100, "y": 200, "z": 200 ,"dataType":"A group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
''';
load jsonStr.`abc` as table1;

select to_json(struct(*)) as value from table1 as table2;
save append table2 as kafka.`wow` where
kafka.bootstrap.servers="127.0.0.1:9092";

这样我每次运行,数据就能写入到Kafka.

接着,我写完后,需要看看数据是不是真的都写进去了,写成了什么样子:

!kafkaTool sampleData 10 records from "127.0.0.1:9092" wow;

这句话表示,我要采样Kafka 10条Kafka数据,该Kafka的地址为127.0.0.1:9092,主题为wow.运行结果如下:

没有什么问题。接着我写一个非常简单的流式程序:

-- the stream name, should be uniq.
set streamName="streamExample";

-- use kafkaTool to infer schema from kafka
!kafkaTool registerSchema 2 records from "127.0.0.1:9092" wow;

load kafka.`wow` options
kafka.bootstrap.servers="127.0.0.1:9092"
as newkafkatable1;

select * from newkafkatable1
as table21;

-- print in webConsole instead of terminal console.
save append table21
as webConsole.``
options mode="Append"
and duration="15"
and checkpointLocation="/tmp/s-cpl4";

运行结果如下:

在终端我们也可以看到实时效果了。

补充

当然,MLSQL Stack 还有对流还有两个特别好地方,第一个是你可以对流的事件设置http协议的callback,以及对流的处理结果再使用批SQL进行处理,最后入库。参看如下脚本:

-- the stream name, should be uniq.
set streamName="streamExample";

-- mock some data.
set data='''
{"key":"yes","value":"no","topic":"test","partition":0,"offset":0,"timestamp":"2008-01-24 18:01:01.001","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":1,"timestamp":"2008-01-24 18:01:01.002","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":2,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":3,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":4,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":5,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
''';

-- load data as table
load jsonStr.`data` as datasource;

-- convert table as stream source
load mockStream.`datasource` options
stepSizeRange="0-3"
as newkafkatable1;

-- aggregation
select cast(value as string) as k from newkafkatable1
as table21;

!callback post "http://127.0.0.1:9002/api_v1/test" when "started,progress,terminated";
-- output the the result to console.

save append table21
as custom.``
options mode="append"
and duration="15"
and sourceTable="jack"
and code='''
select count(*) as c from jack as newjack;
save append newjack as parquet.`/tmp/jack`;
'''
and checkpointLocation="/tmp/cpl15";

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对我们的支持。

(0)

相关推荐

  • 分享101个MySQL调试与优化技巧

    MySQL是一个功能强大的开源数据库.随着越来越多的数据库驱动的应用程序,人们一直在推动MySQL发展到它的极限.这里是101条调节和优化MySQL安装的技巧.一些技巧是针对特定的安装环境的,但这些思路是通用的.我已经把他们分成几类,来帮助你掌握更多MySQL的调节和优化技巧. MySQL 服务器硬件和操作系统调节: 1. 拥有足够的物理内存来把整个InnoDB文件加载到内存中--在内存中访问文件时的速度要比在硬盘中访问时快的多. 2. 不惜一切代价避免使用Swap交换分区 – 交换时是从硬盘读

  • Mysql LONGBLOB 类型存储二进制数据 (修改+调试+整理)

    在DBMS中线要创建数据库test,table bintest,data字段数据类型用LONGBLOB即可测试 //测试文件c:\\test.iso,你可以找任何一个文件修改为即可,我找的是一个exe程序,修改为test.iso而已 //最大测试过加入文件大小为650M(一个正真的iso文件) //注意:还要修改my.ini文件中的max_allowed_packet字段,我设置的是 复制代码 代码如下: //max_allowed_packet = 1024M //#define host "

  • Mysql LONGTEXT 类型存储大文件(二进制也可以) (修改+调试+整理)

    #include "stdafx.h" //是前一篇的姊妹篇 //代码来自网络,我学习整理了一下,测试通过,下面的参数 //需要设置为你自己的 //在DBMS中线要创建数据库www,table www,file字段数据类型用LONGTEXT即可测试 //测试文件c:\\test.iso,你可以找任何一个文件修改为即可,我找的是一个exe程序,修改为test.iso而已 //最大测试过加入文件大小为650M(一个正真的iso文件) //注意:还要修改my.ini文件中的max_allow

  • 新手配置 PHP 调试环境(IIS+PHP+MYSQL)

    目的:配置运行PHP的环境(IIS+PHP+MYSQL+CF+Perl)支持php+cgi+asp+jsp等 操作系统:windows2000 advance server(sp3)简体中文版 需要相关软件: 一:php-4.3.0-Win32.zip ,到相应站点下载,我上传文件不方便,以下同. 二:ActivePerl-5.6.1.635-MSWin32-x86.msi ,注意,必须下载安装文件,就是说.msi的,不要下原代码了,我也不会弄原代码,加装这个支持cgi,如果你不想支持cgi也可

  • Mysql 插入中文及中文查询 (修改+调试)

    //我修改之,能正常运行,测试环境为mysql5.0,xp //关键是设置对字符集,设置gbk,gb2312测试通过,utf8测试未通过 //在运行程序前先建立数据库jj,注意下面几个参数(修改为你自己的) 复制代码 代码如下: // char *host = "localhost"; // char *user = "root"; // char *pass = "674800"; // char *db = "jj"; #

  • MySQL UDF调试方式debugview的相关方法

    MySQL的UDF实质就是一个不需要设置入口点的动态连接库(*Nix称之为共享库).对于DLL的调试可谓个人有个法.现在我介绍一下一个非常简单的易用的调试方法.这一方法直接利用Windows API,语言无关.开发工具无关.项目类型无关,典型的三无调试方法.并且,我们从这里讨论的调试方法支持远程调试,对于一时无法掌握开发工具原本调试器而又急于寻找程序错误的朋友此方法非常实用!  首先我们需要下载接收端,当然有心人也可以自己写一个.在 http://www.sysinternals.com/ntw

  • GDB调试Mysql实战之源码编译安装

    下载源码 git clone https://github.com/mysql/mysql-server.git cd mysql-server git checkout 5.7 编译安装 安装依赖 yum install -y cmake make gcc gcc-c++ ncurses-devel bison gdb 需要注意的一点,需要指定 boost 路径,会 cmake 的时候自动下载 cd BUILD; cmake .. -DDOWNLOAD_BOOST=1 -DWITH_BOOST

  • MLSQL Stack如何让流调试更加简单详解

    前言 有一位同学正在调研MLSQL Stack对流的支持.然后说了流调试其实挺困难的.经过实践,希望实现如下三点: 能随时查看最新固定条数的Kafka数据 调试结果(sink)能打印在web控制台 流程序能自动推测json schema(现在spark是不行的) 实现这三个点之后,我发现调试确实就变得简单很多了. 流程 首先我新建了一个kaf_write.mlsql,里面方便我往Kafka里写数据: set abc=''' { "x": 100, "y": 200,

  • python调试模块ipdb详解

    目录 1. 调试python 1.1 使用ipdb 1.2 常用命令 1. 调试python ipdb是用来python中用以交互式debug的模块,可以直接利用pip安装; 其功能类似于pycharm中 python控制台,而使用ipdb 的优点,便是直接在代码中调试,避免了在python控制台,或者重新设置一些简单变量. pip install ipdb 1.1 使用ipdb 当程序运行到ipdb.set_trace()的地方会自动进入debug模式. for i in range(5):

  • java进行远程部署与调试及原理详解

    这篇文章主要介绍了java进行远程部署与调试及原理详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 远程调试,特别是当你在本地开发的时候,你需要调试服务器上的程序时,远程调试就显得非常有用. JAVA 支持调试功能,本身提供了一个简单的调试工具JDB,支持设置断点及线程级的调试同时,不同的JVM通过接口的协议联系,本地的Java文件在远程JVM建立联系和通信.此篇是Intellij IDEA远程调试的教程汇总和原理解释,知其然而又知其所以然.

  • Java基础之Stream流原理与用法详解

    目录 一.接口设计 二.创建操作 三.中间操作 四.最终操作 五.Collect收集 Stream简化元素计算 一.接口设计 从Java1.8开始提出了Stream流的概念,侧重对于源数据计算能力的封装,并且支持序列与并行两种操作方式:依旧先看核心接口的设计: BaseStream:基础接口,声明了流管理的核心方法: Stream:核心接口,声明了流操作的核心方法,其他接口为指定类型的适配: 基础案例:通过指定元素的值,返回一个序列流,元素的内容是字符串,并转换为Long类型,最终计算求和结果并

  • Kotlin下Rxjava的基础用法及流式调用示例详解

    目录 前言 基础用法 fromXXX create interval & timer 指定线程 observeOn & subscribeOn Flowable 流式调用 背压 前言 万事开头难,写文章也是,现在越来越不知道开头怎么写了,所以在前言中,简单介绍下RxJava吧,第一次听说还是以前做Android开发的时候,那时候好多库中都使用了Rxjava,而在网络请求中,也有很多都是使用Rxjava去写,但自己却没怎么在项目中写过,而在搜索资料中发现,微信中搜rxjava时,最多介绍他的

  • Java中IO流 RandomAccessFile类实例详解

    Java中IO流 RandomAccessFile类实例详解 RandomAccessFile java提供的对文件内容的访问,既可以读文件,也可以写文件. 支持随机访问文件,可以访问文件的任意位置. java文件模型,在硬盘上的文件是byte byte byte存储的,是数据的集合 打开文件,有两种模式,"rw"读写."r"只读:RandomAccessFile raf = new RandomAccessFile(file, "rw");,文

  • C++中stack、queue、vector的用法详解

    一.栈(stack) 引入头文件 #include<stack> 常用的方法 empty() 堆栈为空则返回真 pop() 移除栈顶元素 push() 在栈顶增加元素 size() 返回栈中元素数目 top() 返回栈顶元素 3.实例代码 #include<iostream> #include<stack> using namespace std; int main(){ //创建栈 s stack<int> s; //将元素压入栈 for(int i=0;

  • Java打印流原理及实例详解

    这篇文章主要介绍了Java打印流原理及实例详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 平时我们在控制台打印输出,是调用print方法和println方法完成的,这两个方法都来自于java.io.PrintStream类,该类能够方便地打印各种数据类型的值,是一种便捷的输岀方式. PrintStream类 PrintStream类,为其他输出流添加了功能,使他们能够方便的打印各种数据值表示格式. PrintStream类的特点: 只负责数

  • Java使用FileInputStream流读取文件示例详解

    一.File流概念 JAVA中针对文件的读写操作设置了一系列的流,其中主要有FileInputStream,FileOutputStream,FileReader,FileWriter四种最为常用的流 二.FileInputStream 1)FileInputStream概念  FileInputStream流被称为文件字节输入流,意思指对文件数据以字节的形式进行读取操作如读取图片视频等 2)构造方法 2.1)通过打开与File类对象代表的实际文件的链接来创建FileInputStream流对象

  • phpStudy vscode 搭建debug调试的教程详解

    下载地址 phpstudy:https://www.xp.cn/download.html vscode:https://code.visualstudio.com/ 设置 phpstudy版本:7.3.4nts [Xdebug] zend_extension=D:/phpstudy_pro/Extensions/php/php7.3.4nts/ext/php_xdebug.dll xdebug.collect_params=1 xdebug.collect_return=1 xdebug.au

随机推荐