教你编写 Pipeline 脚本的方法

目录
  • 前言
  • 调试 grok 和 pipeline
  • Grok 通配搜索
  • 多行如何处理
  • Pipeline 字段命名注意事项
  • 完整 Pipeline 示例
  • 如何在一个 Pipeline 中切割多种不同格式的日志?
  • 如何丢弃字段切割
  • add_pattern()转义问题

前言

Pipeline 编写较为麻烦,为此,DataKit 中内置了简单的调试工具,用以辅助大家来编写 Pipeline 脚本。

调试 grok 和 pipeline

指定 pipeline 脚本名称,输入一段文本即可判断提取是否成功

Pipeline 脚本必须放在/pipeline 目录下。

$ datakit pipeline your_pipeline.p -T '2021-01-11T17:43:51.887+0800  DEBUG io  io/io.go:458  post cost 6.87021ms'
Extracted data(cost: 421.705µs): # 表示切割成功
{
"code"   : "io/io.go: 458",       # 对应代码位置
"level"  : "DEBUG",               # 对应日志等级
"module" : "io",                  # 对应代码模块
"msg"    : "post cost 6.87021ms", # 纯日志内容
"time"   : 1610358231887000000    # 日志时间(Unix 纳秒时间戳)    "message": "2021-01-11T17:43:51.887+0800  DEBUG io  io/io.g o:458  post cost 6.87021ms"
}

提取失败示例(只有 message 留下了,说明其它字段并未提取出来):

$ datakit pipeline other_pipeline.p -T '2021-01-11T17:43:51.887+0800  DEBUG io  io/io.g o:458  post cost 6.87021ms'
{
"message": "2021-01-11T17:43:51.887+0800  DEBUG io  io/io.g o:458  post cost 6.87021ms"
} 

如果调试文本比较复杂,可以将它们写入一个文件(sample.log),用如下方式调试:

$ datakit pipeline your_pipeline.p -F sample.log

更多 Pipeline 调试命令,参见 datakit help pipeline。

Grok 通配搜索

由于 Grok pattern 数量繁多,人工匹配较为麻烦。DataKit 提供了交互式的命令行工具grokq(grok query):

datakit tool --grokq
grokq > Mon Jan 25 19:41:17 CST 2021   # 此处输入你希望匹配的文本
2 %{DATESTAMP_OTHER: ?}        # 工具会给出对应对的建议,越靠前匹配月精确(权重也越大)。前面的数字表明权重。
0 %{GREEDYDATA: ?}

grokq > 2021-01-25T18:37:22.016+0800
4 %{TIMESTAMP_ISO8601: ?}      # 此处的 ? 表示你需要用一个字段来命名匹配到的文本
0 %{NOTSPACE: ?}
0 %{PROG: ?}
0 %{SYSLOGPROG: ?}
0 %{GREEDYDATA: ?}             # 像 GREEDYDATA 这种范围很广的 pattern,权重都较低                                       # 权重越高,匹配的精确度越大
grokq > Q                              # Q 或 exit 退出
Bye!

Windows 下,请在 Powershell 中执行调试。

多行如何处理

在处理一些调用栈相关的日志时,由于其日志行数不固定,直接用GREEDYDATA这个 pattern 无法处理如下情况的日志:

2022-02-10 16:27:36.116 ERROR 1629881 --- [scheduling-1] o.s.s.s.TaskUtils$LoggingErrorHandler    : Unexpected error occurred in scheduled task

java.lang.NullPointerException: null

at com.xxxxx.xxxxxxxxxxx.xxxxxxx.impl.SxxxUpSxxxxxxImpl.isSimilarPrize(xxxxxxxxxxxxxxxxx.java:442)

at com.xxxxx.xxxxxxxxxxx.xxxxxxx.impl.SxxxUpSxxxxxxImpl.lambda$getSimilarPrizeSnapUpDo$0(xxxxxxxxxxxxxxxxx.java:595)

at java.util.stream.ReferencePipeline$3$1.accept(xxxxxxxxxxxxxxxxx.java:193)

at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(xxxxxxxxx.java:1382)

at java.util.stream.AbstractPipeline.copyInto(xxxxxxxxxxxxxxxx.java:481)

at java.util.stream.AbstractPipeline.wrapAndCopyInto(xxxxxxxxxxxxxxxx.java:471)

at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(xxxxxxxxx.java:708)

at java.util.stream.AbstractPipeline.evaluate(xxxxxxxxxxxxxxxx.java:234)

at java.util.stream.ReferencePipeline.collect(xxxxxxxxxxxxxxxxx.java:499)

此处可以使用

GREEDYLINES

规则来通配,如(/usr/local/datakit/pipeline/test.p):

add_pattern('_dklog_date', '%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:%{MINUTE}:%{SECOND}%{INT}')
grok(_, '%{_dklog_date:log_time}\\s+%{LOGLEVEL:Level}\\s+%{NUMBER:Level_value}\\s+---\\s+\\[%{NOTSPACE:thread_name}\\]\\s+%{GREEDYDATA:Logger_name}\\s+(\\n)?(%{GREEDYLINES:stack_trace})'

# 此处移除 message 字段便于调试
drop_origin_data()

将上述多行日志存为multi-line.log,调试一下:

$ datakit --pl test.p --txt "$(<multi-line.log)" 

得到如下切割结果:


"Level": "ERROR",  "Level_value": "1629881", 
"Logger_name": "o.s.s.s.TaskUtils$LoggingErrorHandler    : Unexpected error occurred in scheduled task", 
"log_time": "2022-02-10 16:27:36.116", 
"stack_trace": "java.lang.NullPointerException: null\n\tat com.xxxxx.xxxxxxxxxxx.xxxxxxx.impl.SxxxUpSxxxxxxImpl.isSimilarPrize(xxxxxxxxxxxxxxxxx.java:442)\n\tat com.xxxxx.xxxxxxxxxxx.xxxxxxx.impl.SxxxUpSxxxxxxImpl.lambda$getSimilarPrizeSnapUpDo$0(xxxxxxxxxxxxxxxxx.java:595)\n\tat java.util.stream.ReferencePipeline$3$1.accept(xxxxxxxxxxxxxxxxx.java:193)\n\tat java.util.ArrayList$ArrayListSpliterator.forEachRemaining(xxxxxxxxx.java:1382)\n\tat java.util.stream.AbstractPipeline.copyInto(xxxxxxxxxxxxxxxx.java:481)\n\tat java.util.stream.AbstractPipeline.wrapAndCopyInto(xxxxxxxxxxxxxxxx.java:471)\n\tat java.util.stream.ReduceOps$ReduceOp.evaluateSequential(xxxxxxxxx.java:708)\n\tat java.util.stream.AbstractPipeline.evaluate(xxxxxxxxxxxxxxxx.java:234)\n\tat java.util.stream.ReferencePipeline.collect(xxxxxxxxxxxxxxxxx.java:499)", 
 
"thread_name": "scheduling-1"
}

Pipeline 字段命名注意事项

在所有 Pipeline 切割出来的字段中,它们都是指标(field)而不是标签(tag)。由于行协议约束,我们不应该切割出任何跟 tag 同名的字段。这些 Tag 包含如下几类:

  • DataKit 中的全局 Tag
  • 日志采集器中自定义的 Tag

另外,所有采集上来的日志,均存在如下多个保留字段。我们不应该去覆盖这些字段,否则可能导致数据在查看器页面显示不正常。

字段名 类型 说明
source string(tag) 日志来源
service string(tag) 日志对应的服务,默认跟 service 一样
status string(tag) 日志对应的等级
message string(field) 原始日志
time int 日志对应的时间戳

当然我们可以通过特定的 Pipeline 函数覆盖上面这些 tag 的值。

一旦 Pipeline 切割出来的字段跟已有 Tag 重名(大小写敏感),都会导致如下数据报错。故建议在 Pipeline 切割中,绕开这些字段命名。

# 该错误在 DataKit monitor 中能看到<br data-filtered="filtered">same key xxx in tag and field

完整 Pipeline 示例

这里以 DataKit 自身的日志切割为例。DataKit 自身的日志形式如下:

2021-01-11T17:43:51.887+0800  DEBUG io  io/io.go:458  post cost 6.87021ms

编写对应 pipeline:

# pipeline for datakit log
# Mon Jan 11 10:42:41 CST 2021
# auth: tanb

grok(_, '%{_dklog_date:log_time}%{SPACE}%{_dklog_level:level}%{SPACE}%{_dklog_mod:module}%{SPACE}%{_dklog_source_file:code}%{SPACE}%{_dklog_msg:msg}')
rename("time", log_time) # 将 log_time 重名命名为 time
default_time(time)       # 将 time 字段作为输出数据的时间戳
drop_origin_data()       # 丢弃原始日志文本(不建议这么做)

这里引用了几个用户自定义的 pattern,如_dklog_date、_dklog_level。我们将这些规则存放<datakit安装目录>/pipeline/pattern 下。

注意,用户自定义 pattern 如果需要==全局生效==(即在其它 Pipeline 脚本中应用),必须放置在<DataKit安装目录/pipeline/pattern/>目录下):

$ cat pipeline/pattern/datakit
# 注意:自定义的这些 pattern,命名最好加上特定的前缀,以免跟内置的命名冲突(内置 pattern 名称不允许覆盖)
# 自定义 pattern 格式为:
#    <pattern-name><空格><具体 pattern 组合>
_dklog_date %{YEAR}-%{MONTHNUM}-%{MONTHDAY}T%{HOUR}:%{MINUTE}:%{SECOND}%{INT}
_dklog_level (DEBUG|INFO|WARN|ERROR|FATAL)
_dklog_mod %{WORD}
_dklog_source_file (/?[\w_%!$@:.,-]?/?)(\S+)?
_dklog_msg %{GREEDYDATA}

现在 pipeline 以及其引用的 pattern 都有了,就能通过 DataKit 内置的 pipeline 调试工具,对这一行日志进行切割:

# 提取成功示例
$ ./datakit --pl dklog_pl.p --txt '2021-01-11T17:43:51.887+0800  DEBUG io  io/io.go:458  post cost 6.87021ms'
Extracted data(cost: 421.705µs):
{
"code": "io/io.go:458",
"level": "DEBUG",
"module": "io",
"msg": "post cost 6.87021ms",
"time": 1610358231887000000
}

FAQPipeline 调试时,为什么变量无法引用?

Pipeline 为:

json(_, message, "message")
json(_, thread_name, "thread")
json(_, level, "status")
json(_, @timestamp, "time")

其报错如下:

[E] new piepline failed: 4:8 parse error: unexpected character: '@'

A: 对于有特殊字符的变量,需将其用两个`修饰一下:

json(_, `@timestamp`, "time") 

参见【Pipeline 的基本语法规则】https://docs.guance.com/developers/pipeline/

Pipeline 调试时,为什么找不到对应的 Pipeline 脚本?

命令如下:

$ datakit pipeline test.p -T "..."
[E] get pipeline failed: stat /usr/local/datakit/pipeline/test.p: no such file or directory

A: 调试用的 Pipeline 脚本,需将其放置到/pipeline目录下。

如何在一个 Pipeline 中切割多种不同格式的日志?

在日常的日志中,因为业务的不同,日志会呈现出多种形态,此时,需写多个 Grok 切割,为提高 Grok 的运行效率,可根据日志出现的频率高低,优先匹配出现频率更高的那个 Grok,这样,大概率日志在前面几个 Grok 中就匹配上了,避免了无效的匹配。

在日志切割中,Grok 匹配是性能开销最大的部分,故避免重复的 Grok 匹配,能极大的提高 Grok 的切割性能。

grok(_, "%{NOTSPACE:client_ip} %{NOTSPACE:http_ident} ...")
if client_ip != nil {
# 证明此时上面的 grok 已经匹配上了,那么就按照该日志来继续后续处理
...
} else {
# 这里说明是不同的日志来了,上面的 grok 没有匹配上当前的日志
grok(_, "%{date2:time} \\[%{LOGLEVEL:status}\\] %{GREEDYDATA:msg} ...")

    if status != nil {
 # 此处可再检查上面的 grok 是否匹配上...
} else {
# 未识别的日志,或者,在此可再加一个 grok 来处理,如此层层递进
}
}

如何丢弃字段切割

在某些情况下,我们需要的只是日志==中间的几个字段==,但不好跳过前面的部分,比如

200 356 1 0 44 30032 other messages

其中,我们只需要 44 这个值,它可能代码响应延迟,那么可以这样切割(即 Grok 中不附带:some_field 这个部分):

grok(_, "%{INT} %{INT} %{INT} %{INT:response_time} %{GREEDYDATA}")

add_pattern()转义问题

大家在使用 add_pattern()添加局部模式时,容易陷入转义问题,比如如下这个 pattern(用来通配文件路径以及文件名):

(/?[\w_%!$@:.,-]?/?)(\S+)?

如果我们将其放到全局 pattern 目录下(即pipeline/pattern目录),可这么写:

# my-testsource_file (/?[\w_%!$@:.,-]?/?)(\S+)?

如果使用 add_pattern(),就需写成这样:

# my-test.p
add_pattern('source_file', '(/?[\\w_%!$@:.,-]?/?)(\\S+)?')

即这里面反斜杠需要转义。

到此这篇关于如何编写 Pipeline 脚本的文章就介绍到这了,更多相关Pipeline 脚本内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 构建及部署jenkins pipeline实现持续集成持续交付脚本

    目录 前言 新增的步骤脚本 需要注意的点: 关于执行启动应用脚本 关于健康检查 线程休眠 健康检查方式 遇到的问题及小技巧 小技巧: 问题: 具体的安全策略异常如下: 解决方案: 文末结语 前言 之前的文章中,已经全面介绍过jenkins pipeline的特点及用途,以及实操了一把,将我们的构建产物jar包丢到了目标主机.这篇是接着上篇的实操,实现构建即部署的脚本实现.会在之前的git clone(拉源码),maven build(构建),deploy jar(上传jia包)的基础上,在新增两

  • jenkins插件Pipeline脚本jenkinsfile操作指南

    目录 前言 一,安装pipeline支持插件 二,创建流式Item 三,编写pipeline脚本 脚本如下: 添加运行参数 四,尝试构建任务 五,pipeline的一点技巧 文末结语 前言 jenkins是一款流行的开源持续集成软件,插件丰富,扩展灵活.2.0后推出pipeline流式构建,支持构建任务脚本化.本文主要旨在使用jenkins 的pipeline功能完成java maven项目的打包,上传jar到目标服务器.pipeline推出时间不长,实际使用的不是很多,网上基本没啥参考资料,官

  • 教你编写 Pipeline 脚本的方法

    目录 前言 调试 grok 和 pipeline Grok 通配搜索 多行如何处理 Pipeline 字段命名注意事项 完整 Pipeline 示例 如何在一个 Pipeline 中切割多种不同格式的日志? 如何丢弃字段切割 add_pattern()转义问题 前言 Pipeline 编写较为麻烦,为此,DataKit 中内置了简单的调试工具,用以辅助大家来编写 Pipeline 脚本. 调试 grok 和 pipeline 指定 pipeline 脚本名称,输入一段文本即可判断提取是否成功 P

  • 教你编写bat脚本Windows批处理

    目录 常见用途 bat命令速查 常用参数 echo 注释 Rem 使用::声明 dir :获取目录内容 > :将输出重定向到文件 变量 命令行参数 set命令 字符串 数值 局部变量 vs 全局变量 环境变量输出 字符串 创建 空字符串 字符串拼接 字符串长度 转int 截取 右对齐 删除字符串 删除两端(保留中间) 删除空格:= 替换 数组 创建 访问 修改 迭代 数组长度 结构体 if语句 if defined :变量是否存在 if exists:文件是否存在 if errorlevel:测

  • 教你编写SQLMap的Tamper脚本过狗

    目录 测试环境 最新版某狗 测试方法 bypass and order by union select 加个换行试试 获取表字段 编写tamper 测试环境 最新版某狗 测试方法 安全狗其实是比较好绕的WAF,绕过方法很多,但这里我们就用一种:注释混淆 一招鲜吃遍天 注释混淆,其实就是在敏感位置添加垃圾字符注释,常用的垃圾字符有/.!.*.%等 这里再解释一下内联注释,因为后面要用到: MySQL内联注释: /*!xxxxxxx*/ !后面的语句会当作SQL语句直接执行 但是如果!后面跟着MyS

  • sql server编写通用脚本实现获取一年前日期的方法

    问题: 在数据库编程开发中,有时需要获取一年前的日期,以便以此为时间的分界点,查询其前后对应的数据量.例如: 1. 想查询截止到一年前当天0点之前的数据量,以及一年前当天0点开始到现在的数据量. 2. 想查询截止到一年前当天24点之前的数据量,以及一年前当天24点开始到现在的数据量. 3. 想查询截止到一年前当月1日0点之前的数据量,以及一年前当月1日0点开始到现在的数据量. 4. 想查询截止到一年前当月最后一天24点之前的数据量,以及一年前当月最后一天24点开始到现在的数据量. 以上这四种情况

  • linux环境下编写shell脚本实现启动停止tomcat服务的方法

    第一步:以管理员的身份进入控制台,在指定目录下新建一个shell脚本,我这里命名为tomcat.sh 第二步:编写shell脚本 #!/bin/bash tomcat_home=/usr/tomcat/apache-tomcat-8.0.48 SHUTDOWN=$tomcat_home/bin/shutdown.sh STARTTOMCAT=$tomcat_home/bin/startup.sh case $1 in start) echo "启动$tomcat_home" $STAR

  • 编写shell脚本实现tomcat定时重启的方法

    最近我在学生价买的低配服务器上部署了一个很吃内存的网页,导致 tomcat 内存经常溢出而崩溃. 于是我上网找了一些教程编写了一个简单的每天定时启动 tomcat 的脚本,特此记录一下 我的环境是 centos 7 1. 在某个目录新建一个 .sh 脚本文件 vim tomcatStart.sh 2. 在 tomcatStart.sh 文件里面写入一下代码 #!/bin/bash /etc/profile tomcatPath="/usr/local/tomcat9" binPath=

  • 教你编写Windows的VBScript与Mac的AppleSCript脚本解放双手

    目录 一.Windows 篇 —— VBScript 1.效果图 2.VBS 简介 3.代码实现 4.代码详解 5.其他语法 6.附:VBS 特殊字符表格 二.Mac 篇 —— AppleScript 1.效果图 2.代码实现 3.代码详解 一.Windows 篇 —— VBScript 最近发现 windows 上有一个好玩的东西,叫做 VBScript,可以用来自动执行一些操作. 1.效果图 先来看下最终效果吧! 以上就是一个简单的 VBS 脚本,运行时自动打开 Chrome 浏览器,然后自

  • 在 Swift 中编写Git Hooks脚本的方法

    目录 前言 用git hooks自动生成提交信息 为什么我使用Swift? 让我们开始吧 编写git钩子 检索提交消息 注意: 检索问题编号 修改提交信息 设置git钩子 测试结果 参考资料 前言 这周,我决定完成因为工作而推迟了一周的TODO事项来改进我的Git工作流程. 为了在提交的时候尽可能多的携带上下文信息,我们让提交信息包含了正在处理的JIRA编号.这样,将来如果有人回到我们现在正在提交的源代码,输入​ ​git blame​ ​,就能很容易的找出JIRA的编号. 每次提交都包含这些信

  • 使用Node.js为其他程序编写扩展的基本方法

     准备开始 首先我们用下面的目录结构来创建一个节点通知(node-notify)文件夹.   复制代码 代码如下: . |-- build/                   # This is where our extension is built. |-- demo/ |   `-- demo.js              # This is a demo Node.js script to test our extension. |-- src/ |   `-- node_gtkno

随机推荐