python实现skywalking的trace模块过滤和报警(实例代码)

  skywalking本身的报警功能,用起来视乎不是特别好用,目前想实现对skywalking的trace中的错误接口进行过滤并报警通知管理员和开发。所以自己就用python对skywalking做了二次数据清洗实现。项目方在了自己了github(https://github.com/shygit-dev/skywalking-cli-python)上了,有兴趣的同学可以做二次改造,共同学习。下面简单列出了代码内容:

sw-trace.py

#!/usr/bin/env python
# _*_ coding: utf-8 _*_
# Tile:
# Author:shy
import requests
import time
import smtplib
from email.mime.text import MIMEText
import re
def interface_content_filter(trace_id):
    '''
    对详细日志内容(业务逻辑报错)进行过滤
    :param trace_id:
    :return: 【1|0】
    '''
    url = "http://172.16.53.232:50001/query"
    params = {
        "trace_id": trace_id
    }
    detail_trace_id_log = requests.request(method="GET",url=url,params=params)
    detail_trace_id_log = detail_trace_id_log.text
    print(detail_trace_id_log)
    print(type(detail_trace_id_log))
    with open("blackname_keyword_list","r",encoding="utf-8") as f:
        for line in f:
            print(line)
            result = re.search(line.strip(),detail_trace_id_log)
            print(result)
            if result != None:
                print("哥们匹配到日志黑名单关键字了:%s" % line)
                return 0
    print("提示:%s不在关键字黑名单中" % trace_id)
    return 1
def interface_filter(endpointName):
    """
    设置接口黑名单
    :param endpointName:
    :return: 【1|0】
    """
    endpointName = re.sub("\(|\)",".",endpointName)
    with open("blackname_list","r",encoding="utf-8") as f:
        bn_list = f.read()
    match_result = re.search(endpointName.strip(),bn_list)
    if match_result == None:
        print("提示:接口不存在黑名单中")
        return 1
    print("提示:接口在黑名单中")
    return 0
def trace_erro_interface(start_time,end_time,sw_url,per_page_size,trace_detail_addr):
    """
    skywalking trace功能对错误接口进行过滤,默认最大一次获取2000条数据,每分钟执行一次
    :param start_time:
    :param end_time:
    :return:
    """
    url = sw_url
    data = {
          "query": "query queryTraces($condition: TraceQueryCondition) {\n  data: queryBasicTraces(condition: $condition) {\n    traces {\n      key: segmentId\n      endpointNames\n      duration\n      start\n      isError\n      traceIds\n    }\n    total\n  }}",
          "variables": {
            "condition": {
              "queryDuration": {
                "start": start_time, #"2021-12-07 1734"
                "end": end_time,
                "step": "MINUTE"
              },
              "traceState": "ERROR",
              "paging": {
                "pageNum": 1,
                "pageSize": per_page_size,
                "needTotal": "true"
              },
              "queryOrder": "BY_START_TIME"
              # "traceId": "b669d0069be84fce82261901de412e7c.430.16388637511348105"
            }
          }
        }
    result = requests.request(method="post",url=url,json=data)
    i = 0
    # print(result.content)
    # print(time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(float("%s.%s" % (trace["start"][0:10],trace["start"][10:])))))
    with open("mail.html","w",encoding="utf-8") as f:
        f.write('<head><meta charset="UTF-8"><title>Title</title><style>.t {border-right: 2px solid black;border-bottom: 2px solid black;}.t th,td {border-top: 2px solid black;border-left: 2px solid black;font-size: 10px;}</style></head><body><div style="color:red;font-size=15px;">最近15分钟统计:</div><table class="t" border="0" cellspacing="0" cellpadding="10px"><thead><tr style="<th style="width: 100px;">时间</th><th>持续时长</th><th>接口名称</th><th>追踪ID</th></tr></thead><tbody>')
    for trace in result.json()["data"]["data"]["traces"]:
        # print(trace["endpointNames"])
        print("时间:%s\n" % time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(float("%s.%s" % (trace["start"][0:10],trace["start"][10:])))),
              "持续时长:%s\n" % trace["duration"],
              "接口名称:%s\n" % trace["endpointNames"][0],
              "跟踪ID:%s" % trace["traceIds"][0])
        # print(time.localtime(1638869640.194))
        i+=1
        print(i)
        s_time = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(float("%s.%s" % (trace["start"][0:10],trace["start"][10:]))))
        dur_time =  trace["duration"]
        endpointName = trace["endpointNames"][0]
        trace_id = trace["traceIds"][0]
        # 调用接口黑名单过滤功能
        result = interface_filter(endpointName)
        if result == 0:
            print("哥们进入黑名单了!",endpointName)
            continue
        # 调用关键字黑名单过滤功能
        keyword_result = interface_content_filter(trace_id)
        if keyword_result == 0:
            print("哥们进入关键字黑名单了!", trace_id)
            continue
        with open("mail.html","a",encoding="utf-8") as f:
            f.write('<tr><td>%s</td><td>%s</td><td>%s</td><td><a href="http://%s/query?trace_id=%s" rel="external nofollow" >%s</a></td></tr>' %(s_time,dur_time,endpointName,trace_detail_addr,trace_id,trace_id))
    with open("mail.html","a",encoding="utf-8") as f:
        f.write('</tbody></table></body>')
def send_mail(receiver):
    """
    发送报错接口邮件
    :return:
    """
    server = "mail.test.com"
    sender = "sa@test.com"
    sender_pwd = "1qaz@WSX"
    send_addr = "sa@test.com"
    receiver = receiver
    with open("mail.html","r",encoding="utf-8") as f:
        content = f.read()
    if re.search("<td>",content) == None:
        print("无报错接口!",content)
        return 0
    print("邮件前",content)
    msg_mail = MIMEText(content,"html","utf-8")
    msg_mail["Subject"] = "Skywalking报错接口统计"
    msg_mail["From"] = sender
    msg_mail["To"] = receiver
    server_obj = smtplib.SMTP_SSL(server)
    server_obj.connect(server,465)
    server_obj.login(sender,sender_pwd)
    server_obj.sendmail(send_addr,receiver,msg_mail.as_string())
if __name__ == "__main__":
    # 设定查询时间间隔,默认900s(15min)
    end_time = time.time()
    start_time = end_time - 900
    start_time=time.strftime("%Y-%m-%d %H%M",time.localtime(start_time))
    end_time = time.strftime("%Y-%m-%d %H%M", time.localtime(end_time))
    print(start_time)
    print(end_time)
    sw_url = "http://172.16.53.232:9412/graphql" # skywalking的前端服务的地址和端口
    per_page_size = 5000  #指定一次获取endpoint接口的数目
    trace_detail_addr = "127.0.0.1:5000" #指定查询指定trace_id详细日志
    receiver = "shy@test.com"  #报警邮件接收人地址
    trace_erro_interface(start_time,end_time,sw_url,per_page_size,trace_detail_addr)
    send_mail(receiver)
    # interface_filter()
    # interface_content_filter("3c4212dd2dd548d394ba312c4619405d.104.16390380592724487")

sw-trace-id.py

#!/usr/bin/env python
# _*_ coding: utf-8 _*_
# Tile:
# Author:shy
import requests
import time
from flask import Flask,request
app = Flask(__name__)
@app.route("/query",methods=["get"])
def trace_id_query():
    """
    查询指定trace_id详细日志信息
    :return: f.read()
    """
    trace_id = request.args.get("trace_id")
    url="http://172.16.53.232:9412/graphql"
    # url="http://skywalking.roulw.com/graphql"
    data = {
      "query": "query queryTrace($traceId: ID!) {\n  trace: queryTrace(traceId: $traceId) {\n    spans {\n      traceId\n      segmentId\n      spanId\n      parentSpanId\n      refs {\n        traceId\n        parentSegmentId\n        parentSpanId\n        type\n      }\n      serviceCode\n      serviceInstanceName\n      startTime\n      endTime\n      endpointName\n      type\n      peer\n      component\n      isError\n      layer\n      tags {\n        key\n        value\n      }\n      logs {\n        time\n        data {\n          key\n          value\n        }\n      }\n    }\n  }\n  }",
      "variables": {
        "traceId": trace_id
      }
    }
    result = requests.request(method="post",url=url,json=data)
    with open("detail_log", "w", encoding="utf-8") as f:
        f.write("<div style='color: red;font-size: 30px;'>生产Skywalking报错接口跟踪日志日志:<br /></div>")
    for trace_id in result.json()["data"]["trace"]["spans"]:
        if trace_id["isError"]:
            # print(trace_id)
            print("服务名称:%s\n" % trace_id["serviceCode"],
                  "开始时间:%s\n" % trace_id["startTime"],
                  "接口名称:%s\n" % trace_id["endpointName"],
                  "peer名称:%s\n" % trace_id["peer"],
                  "tags名称:%s\n" % trace_id["tags"],
                  "详细日志:%s" % trace_id["logs"])
            content = "服务名称:%s<br />开始时间:%s<br />接口名称:%s<br />peer名称:%s<br />tags名称:%s" % (trace_id["serviceCode"],trace_id["startTime"],trace_id["endpointName"],trace_id["peer"],trace_id["tags"])
            with open("detail_log","a",encoding="utf-8") as f:
                f.write(content)
                f.write("<br />********详细日志**********<br />")
            for logs in trace_id["logs"]:
                for log in logs["data"]:
                    if log["key"] == "message":
                        print(log["value"])
                        with open("detail_log","a",encoding="utf-8") as f:
                            f.write(log["value"])
                        # return log["value"]
                    elif log["key"] == "stack":
                        print(log["value"])
                        with open("detail_log","a",encoding="utf-8") as f:
                            f.write(log["value"])
            with open("detail_log", "a", encoding="utf-8") as f:
                f.write("<div style='color: red;font-size: 20px;'><br />========下一个接口信息=========<br /></div>")
    with open("detail_log","r",encoding="utf-8") as f:
        return f.read()
if __name__ == "__main__":
    # trace_id = "14447ae7199c40a2b9862411daba180b.2142.16388920322367785"
    # trace_id_query(trace_id)
    app.run()

到此这篇关于python实现skywalking的trace模块过滤和报警的文章就介绍到这了,更多相关python trace模块过滤和报警内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Python中使用logging和traceback模块记录日志和跟踪异常

    logging模块 logging模块用于输出运行日志,可以设置不同的日志等级,保存信息到日志文件中等. 相比print,logging可以设置日志的等级,控制在发布版本中的输出内容,并且可以指定日志的输出格式. 1. 使用logging在终端输出日志 #!/usr/bin/env python # -*- coding:utf-8 -*- import logging # 引入logging模块 # 设置打印日志级别 CRITICAL > ERROR > WARNING > INFO

  • Python异常模块traceback用法实例分析

    本文实例讲述了Python异常模块traceback用法.分享给大家供大家参考,具体如下: traceback模块被用来跟踪异常返回信息. 如下例所示: import traceback try: raise SyntaxError, "traceback test" except: traceback.print_exc() 将会在控制台输出类似结果: Traceback (most recent call last):   File "H:PythonWorkSpaceT

  • Python基于traceback模块获取异常信息

    除了使用 sys.exc_info() 方法获取更多的异常信息之外,还可以使用 traceback 模块,该模块可以用来查看异常的传播轨迹,追踪异常触发的源头. 下面示例显示了如何显示异常传播轨迹: class SelfException(Exception): pass def main(): firstMethod() def firstMethod(): secondMethod() def secondMethod(): thirdMethod() def thirdMethod():

  • python实现skywalking的trace模块过滤和报警(实例代码)

      skywalking本身的报警功能,用起来视乎不是特别好用,目前想实现对skywalking的trace中的错误接口进行过滤并报警通知管理员和开发.所以自己就用python对skywalking做了二次数据清洗实现.项目方在了自己了github(https://github.com/shygit-dev/skywalking-cli-python)上了,有兴趣的同学可以做二次改造,共同学习.下面简单列出了代码内容: sw-trace.py #!/usr/bin/env python # _*

  • python的Crypto模块实现AES加密实例代码

    本文主要探索的是python的Crypto模块实现AES加密,分享了具体实现代码,下面看看具体内容. 学了使用Crypto模块的AES来加密文件,现在记录下来便于后边儿查看. 在刚开始知道这个模块的时候,连基本的Crypto模块的安装都花了很多很多时间来搞,也不知道什么情况反正是折腾很久了才安装起的,记得是包安装起来了,但使用的时候始终提示找不到Crypto.Cipher模块.然后怎么解决的呢? 一.把我的python换成了64位的,本来电脑就是64位的也不知道之前是啥情况安装成32位的了.(O

  • Python中摘要算法MD5,SHA1简介及应用实例代码

    关于算法的学习,小编觉得编程语言中的算法大都有一些相通的地方,主要的方面一是了解这一算法能用来干什么,另一方面,学习它在这类编程语言中怎么实现. 摘要算法又称哈希算法.散列算法.它通过一个函数,把任意长度的数据转换为一个长度固定的数据串(通常用16进制的字符串表示).摘要算法就是通过摘要函数f()对任意长度的数据data计算出固定长度的摘要digest,目的是为了发现原始数据是否被人篡改过(不同的data计算出来的摘要不同). 常见的摘要算法有MD5和SHA1 MD5 import hashli

  • python使用xlrd和xlwt读写Excel文件的实例代码

    安装模块 如果使用的是Linux系统,并且安装了pip,可以直接使用pip安装xlrd, xlwt: pip install xlwt pip install xlrd 也可以从官网下载源代码安装: https://pypi.org/project/xlwt/1.1.2/ https://pypi.org/project/xlrd/ 下载tar.gz文件,解压,并转到解压后的目录中,找到setup.py,输入命令: sudo python setup.py install 安装完成. 导入模块:

  • Python中免验证跳转到内容页的实例代码

    相信很多人在浏览网页时,经常会碰到需要输入验证码才可以继续浏览的情况吧,遇到这种问题,大多数人只能进行繁琐的注册验证,今天小编教大家只要使用python就可以免验证方法. 以经常用到的解答网站--上学吧为例,在网站里点击答案页面,会显示验证后才可以查看提示,下面就使用python实现跳过验证码. 我们需要通过python构造随机的 X-Forwarded-For 信息来绕过 ASP 网站的 IP 检测,可以实现对输入的网址正确性进行检查.对验证码核验不通过时的处理等等. python免验证跳转页

  • 利用Python实现Windows下的鼠标键盘模拟的实例代码

    本文介绍了利用Python实现Windows下的鼠标键盘模拟的实例代码,分享给大家 本来用按键精灵是可以实现我的需求,而且更简单,但既然学python ,就看一下呗. 依赖: PyUserInput pip install PyUserInput PyUserInput 依赖 pyhook,所以还得安装 pyhook.按需下载,下载地址. 我是 win10 64 位 python 2.7,用的是第二个,下载之后用解压软件打开,把 pyHook放到C:\Python27\Lib\site-pack

  • Python操作Redis之设置key的过期时间实例代码

    Expire 命令用于设置 key 的过期时间.key 过期后将不再可用. r.set('2', '4028b2883d3f5a8b013d57228d760a93') #成功就返回True 失败就返回False,下面的20表示是20秒 print r.expire('2',20) #如果时间没事失效我们能得到键为2的值,否者是None print r.get('2') 对于一个已经存在的key,我们可以设置其过期时间,到了那个时间后,当你再去访问时,key就不存在了 有两种方式可以设置过期时间

  • python实现定时自动备份文件到其他主机的实例代码

    定时将源文件或目录使用WinRAR压缩并自动备份到本地或网络上的主机 1.确保WinRAR安装在默认路径或者把WinRAR.exe添加到环境变量中 2.在代码里的sources填写备份的文件或目录,target_dir填写备份目的目录 3.delete_source_file为备份完后是否删除源文件(不删除子文件夹) 4.备份成功/失败后生成备份日志 按照格式,填写源目的: sources = [r'E:\目录1', r'E:\目录2\b.txt'] #例:= [ r'E:\test\1234.

  • python 反编译exe文件为py文件的实例代码

    我们用pyinstaller把朋友文件打包成exe文件,但有时候我们需要还原,我们可以用pyinstxtractor.py 用法: python pyinstxtractor.py xxx.exe 之后得到一个这样结构的文件夹 --- xxx.exe_extracted -- out00-PYZ.pyz_extracted - 各种.pyc文件 -- out00-PYZ.pyz -- some -- others -- xxx(注意这些都是没后缀的) 然后再终端pip install uncom

  • python 文件的基本操作 菜中菜功能的实例代码

    python  文件的基本操作 菜中菜 文件操作 ​ open():打开 ​ file:文件的位置(路径) ​ mode:操作文件模式 ​ encoding:文件编码方式 ​ f :文件句柄 f = open("1.txt",mode = 'r',encoding = 'utf-8') print(f.read()) f.close 1.文件操作模式: ​ r,w,a(重要) ​ rb,wb,ab(次要) ​ r+,w+,a+ 1.1 r/w/a 1. r操作: f = open('1

随机推荐