Go语言Elasticsearch数据清理工具思路详解

微服务架构中收集通常大家都采用ELK进行日志收集,同时我们还采用了SkyWalking进行链路跟踪,而SkyWalking数据存储也用到了ES,SkyWalking每天产生大量的索引数据,如下:

WX20211008-104751@2x

这里一天大概产生了700左右个索引数据。对历史的链路数据我们不做过多的保留。

这里我整理了个小工具,可以定期清理es数据。

一、清理思路

可以看到索引数据都是以日期结尾,我们可以根据日期去匹配索引数据,并对索引进行删除。这里需要考虑一点,有的Es服务开启了索引保护机制,不能通过*index去删除,只能通过索引的全名称去删除。所以我们整体流程如下:

1、获取es服务中全部索引数据。

2、根据当前时间-保留天数,获取要删除的日期。

3、通过字符串匹配,判断索引中是否包含要删除的日期,如果包含则进行删除。

4、工具友好性,我们可以通过配置文件配置ES服务地址、日期格式化类型、保留天数等信息。

二、代码实现

2.1、获取ES服务中全部索引数据

要获取Es服务中全部索引数据,我们首先连接Es服务器,这里我们使用github.com/olivere/elastic/v7库操作Es。

连接ES:

func GetEsClient(data Data) *elastic.Client {
    Init()
    file := "./eslog.log"
    logFile, _ := os.OpenFile(file, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766)
    client, err := elastic.NewClient(
        elastic.SetURL(data.Host),
        elastic.SetSniff(false),
        elastic.SetInfoLog(log.New(logFile, "ES-INFO: ", 0)),
        elastic.SetTraceLog(log.New(logFile, "ES-TRACE: ", 0)),
        elastic.SetErrorLog(log.New(logFile, "ES-ERROR: ", 0)),
    )
    if err != nil {
        return nil
    }
    return client
}

我们通过GetEsClient方法,连接ES,并返回client,供后续方法使用。这里的Data是包含了ES服务地址等信息,我们后面会给出Data的数据结构。

获取全部索引数据

func getIndex(data Data) map[string]interface{} {
    client := GetEsClient(data)
    mapping := client.GetMapping()
    service := mapping.Index("*")
    result, err := service.Do(context.Background())
    if err != nil {
        fmt.Printf("create index failed, err: %v\n", err)
        return nil
    }
    return result
}

通过client.GetMapping().Index("*")API获取es服务中全部的索引数据,并返回,数据格式如下:

WX20211008-110537@2x

这次我们获取全部索引完成。

2.2、根据当前时间-保留天数,获取要删除的日期

我们根据当前时间-保留天数,获取当前需要删除的日期数据。我们通过GoLang内置的函数库time完成该功能的实现。

currentTime := time.Now()//获取当前时间
oldTime := currentTime.AddDate(0, 0, data.Day)//通过配置文件获取保留天数
format := oldTime.Format(data.IndexFmt)//通过配置文件获取序列化日期格式

2.3、通过字符串匹配,判断索引中是否包含要删除的日期,如果包含则进行删除

这里通过字符串匹配进行判断是否需要删除索引数据。

func delIndex(data Data) {
    currentTime := time.Now()
    oldTime := currentTime.AddDate(0, 0, data.Day)
    format := oldTime.Format(data.IndexFmt)
    index := getIndex(data)//获取全部索引
    for k := range index {//遍历索引数据
        fmt.Println("key:", k, "format:", format)
        if find := strings.Contains(k, format); find { //判断索引中是否包含要删除的日期格式,
            DelIndex(data, k)//如果包含则调用DelIndex方法删除
        }
    }
}
// DelIndex 删除 index
func DelIndex(data Data, index ...string) bool {
    client := GetEsClient(data)
    response, err := client.DeleteIndex(index...).Do(context.Background())
    if err != nil {
        fmt.Printf("delete index failed, err: %v\n", err)
        return false
    }
    return response.Acknowledged
}

通过DeleteIndexAPI删除指定的数据。

2.4、通过配置文件灵活配置数据

这里我们定义了Config和Data对象,对象结构如下:

type Config struct {
    Data []Data `json:"data"`
}

type Data struct {
    Host     string `json:"host"`
    IndexFmt string `json:"index_fmt"`
    Day      int    `json:"day"`
}

配置文件内容如下:

{
  "data": [
    {
      "host": "http://ip1:9200",//服务IP
      "index_fmt": "20060102",//日期格式化
      "day": -1 //保留天数 保留1天
    },
    {
      "host": "http://ip2:9200/",
      "index_fmt": "20060102",
      "day": -1
    },
    {
      "host": "http://ip3:32093",
      "index_fmt": "2006.01.02",
      "day": -7  //保留天数 保留7天
    }
  ]
}

我们通过Init方法加载配置文件到Config;

var config Config

func Init() {
    JsonParse := NewJsonStruct()
    //下面使用的是相对路径,config.json文件和main.go文件处于同一目录下
    JsonParse.Load("config/config.json", &config)
}

type JsonStruct struct {
}

func NewJsonStruct() *JsonStruct {
    return &JsonStruct{}
}

func (jst *JsonStruct) Load(filename string, v interface{}) {
    //ReadFile函数会读取文件的全部内容,并将结果以[]byte类型返回
    data, err := ioutil.ReadFile(filename)
    if err != nil {
        return
    }

    //读取的数据为json格式,需要进行解码
    err = json.Unmarshal(data, v)
    if err != nil {
        return
    }
}

编写Main方法运行程序:

func main() {
    Init()
    for i, datum := range config.Data {
        fmt.Printf("config data Host is [%s], fmt is [%s]\n", datum.Host, datum.IndexFmt)
        println(i)
        delIndex(datum)
    }
}

这里我们依然遍历配置文件中的多个服务配置。可以同时管理多个Es服务。

三、完整代码

package main

import (
    "encoding/json"
    "fmt"
    "io/ioutil"
    "strings"
    "time"
)

type Config struct {
    Data []Data `json:"data"`
}

type Data struct {
    Host     string `json:"host"`
    IndexFmt string `json:"index_fmt"`
    Day      int    `json:"day"`
}

var config Config

func Init() {
    JsonParse := NewJsonStruct()
    //下面使用的是相对路径,config.json文件和main.go文件处于同一目录下
    JsonParse.Load("config/config.json", &config)
}

type JsonStruct struct {
}

func NewJsonStruct() *JsonStruct {
    return &JsonStruct{}
}

func (jst *JsonStruct) Load(filename string, v interface{}) {
    //ReadFile函数会读取文件的全部内容,并将结果以[]byte类型返回
    data, err := ioutil.ReadFile(filename)
    if err != nil {
        return
    }

    //读取的数据为json格式,需要进行解码
    err = json.Unmarshal(data, v)
    if err != nil {
        return
    }
}

func delIndex(data Data) {
    currentTime := time.Now()
    oldTime := currentTime.AddDate(0, 0, data.Day)
    format := oldTime.Format(data.IndexFmt)
    index := getIndex(data)
    for k := range index {
        fmt.Println("key:", k, "format:", format)
        if find := strings.Contains(k, format); find {
            DelIndex(data, k)
        }
    }
}

func main() {
    Init()
    for i, datum := range config.Data {
        fmt.Printf("config data Host is [%s], fmt is [%s]\n", datum.Host, datum.IndexFmt)
        println(i)
        delIndex(datum)
    }
}
package main

import (
    "context"
    "fmt"
    "github.com/olivere/elastic/v7"
    "log"
    "os"
    "time"
)

// GetEsClient 初始化客户端
func GetEsClient(data Data) *elastic.Client {
    Init()
    file := "./eslog.log"
    logFile, _ := os.OpenFile(file, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 应该判断error,此处简略
    client, err := elastic.NewClient(
        elastic.SetURL(data.Host),
        elastic.SetSniff(false),
        elastic.SetInfoLog(log.New(logFile, "ES-INFO: ", 0)),
        elastic.SetTraceLog(log.New(logFile, "ES-TRACE: ", 0)),
        elastic.SetErrorLog(log.New(logFile, "ES-ERROR: ", 0)),
    )
    if err != nil {
        return nil
    }
    return client
}

// IsDocExists 判断索引是否存储
func IsDocExists(data Data, id string, index string) bool {
    client := GetEsClient(data)
    defer client.Stop()
    exist, _ := client.Exists().Index(index).Id(id).Do(context.Background())
    if !exist {
        log.Println("ID may be incorrect! ", id)
        return false
    }
    return true
}

// PingNode 是否联通
func PingNode(data Data) {
    start := time.Now()
    client := GetEsClient(data)
    info, code, err := client.Ping(data.Host).Do(context.Background())
    if err != nil {
        fmt.Printf("ping es failed, err: %v", err)
    }
    duration := time.Since(start)
    fmt.Printf("cost time: %v\n", duration)
    fmt.Printf("Elasticsearch returned with code %d and version %s\n", code, info.Version.Number)
}

// GetDoc 获取文档
func GetDoc(data Data, id string, index string) (*elastic.GetResult, error) {
    client := GetEsClient(data)
    defer client.Stop()
    if !IsDocExists(data, id, index) {
        return nil, fmt.Errorf("id不存在")
    }
    esResponse, err := client.Get().Index(index).Id(id).Do(context.Background())
    if err != nil {
        return nil, err
    }
    return esResponse, nil
}

// CreateIndex 创建 index
func CreateIndex(data Data, index, mapping string) bool {
    client := GetEsClient(data)
    result, err := client.CreateIndex(index).BodyString(mapping).Do(context.Background())
    if err != nil {
        fmt.Printf("create index failed, err: %v\n", err)
        return false
    }
    return result.Acknowledged
}

// DelIndex 删除 index
func DelIndex(data Data, index ...string) bool {
    client := GetEsClient(data)
    response, err := client.DeleteIndex(index...).Do(context.Background())
    if err != nil {
        fmt.Printf("delete index failed, err: %v\n", err)
        return false
    }
    return response.Acknowledged
}

func getIndex(data Data) map[string]interface{} {
    client := GetEsClient(data)
    mapping := client.GetMapping()
    service := mapping.Index("*")
    result, err := service.Do(context.Background())
    if err != nil {
        fmt.Printf("create index failed, err: %v\n", err)
        return nil
    }
    return result
}

代码已经上传github需要的可自行下载。

到此这篇关于Go语言Elasticsearch数据清理工具的文章就介绍到这了,更多相关Go Elasticsearch数据清理工具内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Django项目之Elasticsearch搜索引擎的实例

    1.使用Docker安装Elasticsearch及其扩展 获取镜像,可以通过网络pull sudo docker image pull delron/elasticsearch-ik:2.4.6-1.0 或者加载镜像文件 sudo docker load -i elasticsearch-ik-2.4.6_docker.tar 修改elasticsearch的配置文件 elasticsearc-2.4.6/config/elasticsearch.yml第54行,更改ip地址为本机ip地址 n

  • golang elasticsearch Client的使用详解

    elasticsearch 的client ,通过 NewClient 建立连接,通过 NewClient 中的 Set.URL设置访问的地址,SetSniff设置集群 获得连接 后,通过 Index 方法插入数据,插入后可以通过 Get 方法获得数据(最后的测试用例中会使用 elasticsearch client 的Get 方法) func Save(item interface{}) { client, err := elastic.NewClient( elastic.SetURL("h

  • Django利用elasticsearch(搜索引擎)实现搜索功能

     1.在Django配置搜索结果页的路由映射 """pachong URL Configuration The `urlpatterns` list routes URLs to views. For more information please see: https://docs.djangoproject.com/en/1.10/topics/http/urls/ Examples: Function views 1. Add an import: from my_ap

  • golang 单点登录系统(go-sso)的实现

    这是一个基于Go语言开发的单点登录系统,实现手机号注册.手机号+验证码登录.手机号+密码登录.账号登出等功能,用户认证采用cookie和jwt两种方式.收发短信相关方法已提供,仅需根据短信通道提供商提供的接口做相应的参数配置即可使用. 环境介绍 golang语言:go1.13.3+ . 数据库:mysql5.7 缓存:redis3.0 项目地址 https://github.com/guyan0319/go-sso 依赖包: github.com/dgrijalva/jwt-go github.

  • Django对接elasticsearch实现全文检索的示例代码

    目录 前言 第一步:首先安装相关的依赖包 第二步:在django项目配置文件settings.py中注册应用 第三步:在django项目配置文件settings.py中指定搜索的后端 第四步:创建索引类 第五步:在templates目录中创建text字段使用的模板文件 第六步:手动更新索引 第七步:创建haystack序列化器 第八步:创建视图类 第九步:添加路由 第十步:结果 前言 说到搜索,第一时间想到的是mysql数据库的like语句 但是,假如你的数据库有几千万条数据,name字段没有索

  • django使用haystack调用Elasticsearch实现索引搜索

    前言: 在做一个商城项目的时候,需要实现商品搜索功能. 说到搜索,第一时间想到的是数据库的 select * from tb_sku where name like %苹果手机% 或者django的 SKU.objects.filter(name__contains="苹果手机") 但是,假如你的数据库有几千万条数据,name字段没有索引,可能查询需要十几分钟,用户可能会等你?那为什么不给name字段增加索引?商品表不仅仅是用来查询,也会经常修改数据,新增删除数据等.建立索引后,做增删

  • Go语言Elasticsearch数据清理工具思路详解

    微服务架构中收集通常大家都采用ELK进行日志收集,同时我们还采用了SkyWalking进行链路跟踪,而SkyWalking数据存储也用到了ES,SkyWalking每天产生大量的索引数据,如下: WX20211008-104751@2x 这里一天大概产生了700左右个索引数据.对历史的链路数据我们不做过多的保留. 这里我整理了个小工具,可以定期清理es数据. 一.清理思路 可以看到索引数据都是以日期结尾,我们可以根据日期去匹配索引数据,并对索引进行删除.这里需要考虑一点,有的Es服务开启了索引保

  • 使用.NET 6开发TodoList应用之引入数据存储的思路详解

    需求 作为后端CRUD程序员(bushi,数据存储是开发后端服务一个非常重要的组件.对我们的TodoList项目来说,自然也需要配置数据存储.目前的需求很简单: 需要能持久化TodoList对象并对其进行操作: 需要能持久化TodoItem对象并对其进行操作: 问题是,我们打算如何存储数据? 存储组件的选择非常多:以MSSQL Server/Postgres/MySql/SQLite等为代表的关系型数据库,以MongoDB/ElasticSearch等为代表的非关系型数据库,除此之外,我们还可以

  • vue3 el-table结合seamless-scroll实现表格数据滚动的思路详解

    github开源地址:https://github.com/xfy520/vue3-seamless-scroll 步骤 1. 安装 npm install vue3-seamless-scroll --save 2. vue代码 <template> <el-table :data="tableData" style="width: 100%" class="top" > <el-table-column prop

  • PHP切割整数工具类似微信红包金额分配的思路详解

    Composer地址:https://packagist.org/packages/werbenhu/php-number-slicing GitHub地址:https://github.com/werbenhu/php-number-slicing 主要代码:NumberSlicing.php 思路:将数字按精度放大倍数,比如切割数字1,切割的份数是10,精度是0.01,则将1放大100 X 10倍,然后再来对加了1000倍权重后的值进行切割.切割完成之后,再将权重去除,保证总值是1. <?p

  • python爬虫利用selenium实现自动翻页爬取某鱼数据的思路详解

    基本思路: 首先用开发者工具找到需要提取数据的标签列 利用xpath定位需要提取数据的列表 然后再逐个提取相应的数据: 保存数据到csv: 利用开发者工具找到下一页按钮所在标签: 利用xpath提取此标签对象并返回: 调用点击事件,并循环上述过程: 最终效果图: 代码: from selenium import webdriver import time import re class Douyu(object): def __init__(self): # 开始时的url self.start

  • C语言顺序表的基本结构与实现思路详解

    目录 一.顺序表的概念与结构 1.线性表的解释 2.顺序表概念解释 二.顺序表的思路及代码实现详解 1.静态顺序表的实现 2.动态顺序表思路及代码实现 2.1 动态顺序表的整体思路 2.2 定义结构体的实现 2.3 初始化结构体 2.4 结构体打印 2.5 检查数组容量 2.6 头插 2.7 尾插 2.8 头删 2.9 尾删 2.10 任意删除 2.11 任意插入 2.12 空间释放 三.顺序表代码整合 SeqList.h SeqList.c test.c 一.顺序表的概念与结构 1.线性表的解

  • python 数据的清理行为实例详解

    python 数据的清理行为实例详解 数据清洗主要是指填充缺失数据,消除噪声数据等操作,主要还是通过分析"脏数据"产生的原因和存在形式,利用现有的数据挖掘手段去清洗"脏数据",然后转化为满足数据质量要求或者是应用要求的数据. 1.try 语句还有另外一个可选的子句,它定义了无论在任何情况下都会执行的清理行为. 例如: >>>try: raiseKeyboardInterrupt finally: print('Goodbye, world!') G

  • vuejs实现本地数据的筛选分页功能思路详解

    今天项目需要一份根据本地数据的筛选分页功能,好吧,本来以为很简单,网上搜了搜全是ajax获取的数据,这不符合要求啊,修改起来太费力气,还不如我自己去写,不多说直接上代码 效果图: 项目需要:点击左侧进行数据筛选,实现自动分页,自动生成页数,点击自动跳转 项目代码:js代码 var subList=new Vue({ el:'#main', data:{ // subcontentData为本地数据 subContents:subcontentData, // 页面需要展现的数据 yemianda

  • Python实现大数据收集至excel的思路详解

    一.在工程目录中新建一个excel文件 二.使用python脚本程序将目标excel文件中的列头写入,本文省略该部分的code展示,可自行网上查询 三.以下code内容为:实现从接口获取到的数据值写入excel的整体步骤 1.整体思路: (1).根据每日调取接口的日期来作为excel文件中:列名为"收集日期"的值 (2).程序默认是每天会定时调取接口并获取接口的返回值并写入excel中(我使用的定时任务是:linux下的contab) (3).针对接口异常未正确返回数据时,使用特殊符号

  • C语言 数据存储方式知识点详解

    C语言 数据存储方式 一.源码 一个数的原码(原始的二进制码)有如下特点: 最高位做为符号位,0表示正,为1表示负 其它数值部分就是数值本身绝对值的二进制数 负数的原码是在其绝对值的基础上,最高位变为1 下面数值以1字节的大小描述: 十进制数 原码 +15 0000 1111 -15 1000 1111 +0 0000 0000 -0 1000 0000 注:原码表示法简单易懂,与带符号数本身转换方便,只要符号还原即可,但当两个正数相减或不同符号数相加时,必须比较两个数哪个绝对值大,才能决定谁减

随机推荐