Golang监听日志文件并发送到kafka中

目录
  • 前言
  • 涉及的golang库和可视化工具:
  • 工作的流程
  • 环境准备
  • 代码分层
  • 关键的代码
    • main.go
    • kafka.go
    • tail.go

前言

日志收集项目的准备中,本文主要讲的是利用golang的tail库,监听日志文件的变动,将日志信息发送到kafka中。

涉及的golang库和可视化工具:

go-ini,sarama,tail其中:

  • go-ini:用于读取配置文件,统一管理配置项,有利于后其的维护
  • sarama:是一个go操作kafka的客户端。目前我用于向kefka发送消息
  • tail:类似于linux的tail命令了,读取文件的后几行。如果文件有追加数据,会检测到。就是通过它来监听日志文件

可视化工具:

offsetexplorer:是kafka的可视化工具,这里用来查看消息是否投递成功

工作的流程

  • 加载配置,初始化saramakafka
  • 起一个的协程,利用tail不断去监听日志文件的变化。
  • 主协程中一直阻塞等待tail发送消息,两者通过一个管道通讯。一旦主协程接收到新日志,组装格式,然后发送到kafka中

环境准备

环境的话,确保zookeeperkafka正常运行。因为还没有使用sarama读取数据,使用offsetexplorer来查看任务是否真的投递成功了。

代码分层

serve来存放写tail服务类和sarama服务类,conf存放ini配置文件

main函数为程序入口

关键的代码

main.go

main函数做的有:构建配置结构体,映射配置文件。调用和初始化tail,srama服务。

package main

import (
	"fmt"
	"sarama/serve"

	"github.com/go-ini/ini"
)

type KafkaConfig struct {
	Address     string `ini:"address"`
	ChannelSize int    `ini:"chan_size"`
}
type TailConfig struct {
	Path     string `ini:"path"`
	Filename string `ini:"fileName"`
	// 如果是结构体,则指明分区名
	Children `ini:"tailfile.children"`
}
type Config struct {
	KafkaConfig `ini:"kafka"`
	TailConfig  `ini:"tailfile"`
}
type Children struct {
	Name string `ini:"name"`
}

func main() {
	// 加载配置
	var cfg = new(Config)
	err := ini.MapTo(cfg, "./conf/go-conf.ini")
	if err != nil {
		fmt.Print(err)
	}
	// 初始化kafka
	ks := &serve.KafukaServe{}
	// 启动kafka消息监听。异步
	ks.InitKafka([]string{cfg.KafkaConfig.Address}, int64(cfg.KafkaConfig.ChannelSize))
	// 关闭主协程时,关闭channel
	defer ks.Destruct()

	// 初始化tail
	ts := &serve.TailServe{}
	ts.TailInit(cfg.TailConfig.Path + "/" + cfg.TailConfig.Filename)
	// 阻塞
	ts.Listener(ks.MsgChan)

}

kafka.go

有3个方法 :

  • InitKafka,组装配置项以及初始化接收消息的管道,
  • Listener,监听管道消息,收到消息后,将消息组装,发送到kafka
  • Destruct, 关闭管道
package serve

import (
	"fmt"

	"github.com/Shopify/sarama"
)

type KafukaServe struct {
	MsgChan chan string
	//err         error
}

func (ks *KafukaServe) InitKafka(addr []string, chanSize int64) {

	// 读取配置
	config := sarama.NewConfig()
	// 1. 初始化生产者配置
	config.Producer.RequiredAcks = sarama.WaitForAll
	// 选择分区
	config.Producer.Partitioner = sarama.NewRandomPartitioner
	// 成功交付的信息
	config.Producer.Return.Successes = true

	ks.MsgChan = make(chan string, chanSize)

	go ks.Listener(addr, chanSize, config)

}

func (ks *KafukaServe) Listener(addr []string, chanSize int64, config *sarama.Config) {
	//  连接kafka
	var kafkaClient, _ = sarama.NewSyncProducer(addr, config)
	defer kafkaClient.Close()
	for {
		select {
		case content := <-ks.MsgChan:
			//
			msg := &sarama.ProducerMessage{
				Topic: "weblog",
				Value: sarama.StringEncoder(content),
			}
			partition, offset, err := kafkaClient.SendMessage(msg)
			if err != nil {
				fmt.Println(err)
			}
			fmt.Println("分区,偏移量:")
			fmt.Println(partition, offset)
			fmt.Println("___")
		}

	}
}

func (ks *KafukaServe) Destruct() {
	close(ks.MsgChan)
}

tail.go

主要包括了两个方法:

  • TailInit初始化,组装tail配置。Listener
  • Listener,保存kafka服务类初始化之后的管道。监听日志文件,如果有新日志,就往管道里发送
package serve

import (
	"fmt"

	"github.com/hpcloud/tail"
)

type TailServe struct {
	tails *tail.Tail
}

func (ts *TailServe) TailInit(filenName string) {
	config := tail.Config{
		ReOpen:    true,
		Follow:    true,
		Location:  &tail.SeekInfo{Offset: 0, Whence: 2},
		MustExist: false,
		Poll:      true,
	}
	// 打开文件开始读取数据

	ts.tails, _ = tail.TailFile(filenName, config)

	// if err != nil {
	// 	fmt.Println("tails %s failed,err:%v\n", filenName, err)
	// 	return nil, err
	// }
	fmt.Println("启动," + filenName + "监听")
}

func (ts *TailServe) Listener(MsgChan chan string) {
	for {
		msg, ok := <-ts.tails.Lines
		if !ok {
			// todo
			fmt.Println("数据接收失败")
			return
		}
		fmt.Println(msg.Text)
		MsgChan <- msg.Text
	}
}

// 测试案例
func Demo() {
	filename := `E:\xx.log`
	config := tail.Config{
		ReOpen:    true,
		Follow:    true,
		Location:  &tail.SeekInfo{Offset: 0, Whence: 2},
		MustExist: false,
		Poll:      true,
	}
	// 打开文件开始读取数据
	tails, err := tail.TailFile(filename, config)
	if err != nil {
		fmt.Println("tails %s failed,err:%v\n", filename, err)
		return
	}
	var (
		msg *tail.Line
		ok  bool
	)
	fmt.Println("启动")
	for {
		msg, ok = <-tails.Lines
		if !ok {
			fmt.Println("tails file close reopen,filename:$s\n", tails.Filename)
		}
		fmt.Println("msg:", msg.Text)
	}
}

到此这篇关于Golang监听日志文件并发送到kafka中的文章就介绍到这了,更多相关Golang 监听日志文件 内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • golang监听文件变化的实例

    废话不多说,直接上官网demo package main import ( "log" "github.com/fsnotify/fsnotify" ) func main() { watcher, err := fsnotify.NewWatcher() if err != nil { log.Fatal(err) } defer watcher.Close() done := make(chan bool) go func() { for { select {

  • golang 监听服务的信号,实现平滑启动,linux信号说明详解

    监听服务的信号,实现平滑启动,linux信号说明 package main import ( "context" "fmt" "golang.org/x/sync/errgroup" "net/http" "os" "os/signal" "syscall" ) func main() { g, ctx := errgroup.WithContext(context.

  • jquery与google map api结合使用 控件,监听器

    Google Maps JavaScript. API可以让您在自己的网页上使用Google地图.在使用API之前,您应该先申请一 个API key,申请API key请到:http://code.google.com/apis/maps/signup.html.这里假设你获取到的key是:ABQIAA. 关于jquery的获取不再此处累赘,网上有许多关于jquery的介绍. 接着我们就使用JQuery和Google Maps JavaScript. API来结合表现一下google map的有

  • 关于golang监听rabbitmq消息队列任务断线自动重连接的问题

    golang监听消息队列rabbitmq任务脚本,当rabbimq消息队列断开连接后自动重试,重新唤起协程执行任务 需求背景: goalng常驻内存任务脚本监听rbmq执行任务 任务脚本由supervisor来管理 当rabbitmq长时间断开连接会出现如下图 进程处于fatal状态 假如因为不可抗拒因素,rabbitmq服务器内存满了或者其它原因导致rabbitmq消息队列服务停止了 如果是短时间的停止重启,supervisor是可以即时唤醒该程序.如果服务器长时间没有恢复正常运行,程序就会出

  • Golang监听日志文件并发送到kafka中

    目录 前言 涉及的golang库和可视化工具: 工作的流程 环境准备 代码分层 关键的代码 main.go kafka.go tail.go 前言 日志收集项目的准备中,本文主要讲的是利用golang的tail库,监听日志文件的变动,将日志信息发送到kafka中. 涉及的golang库和可视化工具: go-ini,sarama,tail其中: go-ini:用于读取配置文件,统一管理配置项,有利于后其的维护 sarama:是一个go操作kafka的客户端.目前我用于向kefka发送消息 tail

  • Nodejs监听日志文件的变化的过程解析

    最近有在做日志文件的分析,其中有一个需求:A服务器项目需要用Nodejs监听日志文件的变化,当项目产生了新的日志信息,将新的部分通过socket传输到B服务器项目.socket暂时不做分析. 这个需求很简单,通过分析我们开始撸码吧. 在撸码的过程中还能巩固所学Nodejs的API,何乐而不为呢? 所用的API fs.watchFile() 语法 fs.watchFile(filename[, options], listener) 参数解析 filename <string> | <Bu

  • oracle 11g的警告日志和监听日志的删除方法

    oracle 11g的监听日志和警告日志都是在/u01/oracle/diag/tnslsnr/oracle/listener目录和/u01/oracle/diag/rdbms/db1/db1目录下都有以下分别简称listener目录和db1目录.这两目录下都有如下目录:alert cdump hm incident incpkg ir lck metadata stage sweep trace 其中警告日志在alert目录下,监听日志在trace目录下.listener目录下产生的日志文件比

  • Oracle监听日志定期清理

    环境: Oracle 11.2.0 Win Server 2008 R2 Enterprise 原因:Oracle监听日志文件大小超过4G,oracle监听连接时断时续 解决办法:重新建立新的日志文件,通过计划任务定期执行,为方便,我这里每天执行一次. 批处理文件内容如下: @echo off rem 因数据库监听日志过大,影响oracle使用,需定期清理 rem 停止监听写日志 lsnrctl set log_status off rem 修改监听日志文件名称,每天执行一次 ren E:\ap

  • 教你3分钟利用原生js实现有进度监听的文件上传预览组件

    前言 本文主要介绍如何使用原生js,通过面向对象的方式实现一个文件上传预览的组件,该组件利用FileReader来实现文件在前端的解析,预览,读取进度等功能,并对外暴露相应api来实现用户自定义的需求,比如文件上传,进度监听,自定义样式,读取成功回调等. 组件设计架构如下: 涉及的核心知识点如下: 闭包:减少变量污染,缩短变量查找范围 自执行函数 file API:对文件进行读取,解析,监控文件事件 DocumentFragment API:主要用来优化dom操作 minix :用来实现对象混合

  • React实现监听粘贴事件并获取粘贴板中的截图

    目录 监听粘贴事件并获取粘贴板中的截图 TSX中给组件添加监听粘贴事件 从粘贴板获取截图文件 React监听事件 事件监听 绑定的事件函数相关 扩展 监听粘贴事件并获取粘贴板中的截图 TSX中给组件添加监听粘贴事件 const pasteImageRef = useRef<HTMLDivElement>(null); useEffect(()=>{     //给组件添加监听粘贴事件     pasteImageRef.current?.addEventListener('paste',

  • NodeJS Web应用监听sock文件实例

    像 NodeJS 写的 TCP 服务可以监听在某个 sock 文件(Domain Socket) 上,它的 HTTP 服务也能这么干.虽然作为 HTTP 服务连接某个 sock 文件的意义不大,所以这里只算是一个纯粹的尝试. TCP 服务是这样写 复制代码 代码如下: var net = require('net'); net.createServer(function (socket) {   socket.on('data', function (data) {     socket.wri

随机推荐