Go操作Kafka和Etcd方法详解

目录
  • 操作Kafka
    • sarama
    • 下载及安装
    • 注意事项
    • 连接 kafka 发送消息
    • 连接 kafka 消费消息
  • 操作Etcd
    • 安装
    • put和get操作
    • watch操作
    • 安装报错:

操作Kafka

Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据,具有高性能、持久化、多副本备份、横向扩展等特点。本文介绍了如何使用 Go 语言发送和接收 kafka 消息。

sarama

Go 语言中连接 kafka 使用第三方库:github.com/Shopify/sar…

下载及安装

go get github.com/Shopify/sarama

注意事项

sarama v1.20 之后的版本加入了zstd压缩算法,需要用到 cgo,在 Windows 平台编译时会提示类似如下错误:

# github.com/DataDog/zstd
exec: "gcc":executable file not found in %PATH%

所以在 Windows 平台请使用 v1.19 版本的 sarama。

连接 kafka 发送消息

package main
import (
	"fmt"
	"github.com/Shopify/sarama"
)
// 基于sarama第三方库开发的kafka client
func main() {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
	config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
	config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回
	// 构造一个消息
	msg := &sarama.ProducerMessage{}
	msg.Topic = "web_log"
	msg.Value = sarama.StringEncoder("this is a test log")
	// 连接kafka
	client, err := sarama.NewSyncProducer([]string{"192.168.1.7:9092"}, config)
	if err != nil {
		fmt.Println("producer closed, err:", err)
		return
	}
	defer client.Close()
	// 发送消息
	pid, offset, err := client.SendMessage(msg)
	if err != nil {
		fmt.Println("send msg failed, err:", err)
		return
	}
	fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

连接 kafka 消费消息

package main
import (
	"fmt"
	"github.com/Shopify/sarama"
)
// kafka consumer
func main() {
	consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
	if err != nil {
		fmt.Printf("fail to start consumer, err:%v\n", err)
		return
	}
	partitionList, err := consumer.Partitions("web_log") // 根据topic取到所有的分区
	if err != nil {
		fmt.Printf("fail to get list of partition:err%v\n", err)
		return
	}
	fmt.Println(partitionList)
	for partition := range partitionList { // 遍历所有的分区
		// 针对每个分区创建一个对应的分区消费者
		pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest)
		if err != nil {
			fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
			return
		}
		defer pc.AsyncClose()
		// 异步从每个分区消费信息
		go func(sarama.PartitionConsumer) {
			for msg := range pc.Messages() {
				fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
			}
		}(pc)
	}
}

操作Etcd

这里使用官方的etcd/clientv3包来连接etcd并进行相关操作。

安装

go get go.etcd.io/etcd/clientv3

put和get操作

put命令用来设置键值对数据,get命令用来根据key获取值。

package main
import (
	"context"
	"fmt"
	"time"
	"go.etcd.io/etcd/clientv3"
)
func main(){
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"122.51.79.172:2379"},
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		// handle error!
		fmt.Printf("connect to etcd failed, err:%v\n", err)
		return
	}
	fmt.Println("connect to etcd success")
	defer cli.Close()
	// put
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	_, err = cli.Put(ctx, "coolops", "test")
	cancel()
	if err != nil {
		fmt.Printf("put to etcd failed, err:%v\n", err)
		return
	}
	// get
	ctx, cancel = context.WithTimeout(context.Background(), time.Second)
	resp, err := cli.Get(ctx, "coolops")
	cancel()
	if err != nil {
		fmt.Printf("get from etcd failed, err:%v\n", err)
		return
	}
	for _, ev := range resp.Kvs {
		fmt.Printf("%s:%s\n", ev.Key, ev.Value)
	}
}

watch操作

watch用来获取未来更改的通知。

package main
import (
	"context"
	"fmt"
	"time"
	"go.etcd.io/etcd/clientv3"
)
func main(){
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"122.51.79.172:2379"},
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		// handle error!
		fmt.Printf("connect to etcd failed, err:%v\n", err)
		return
	}
	fmt.Println("connect to etcd success")
	defer cli.Close()
	// watch 操作,返回的是一个通道
	rch := cli.Watch(context.Background(), "coolops") // <-chan WatchResponse
	for wresp := range rch {
		for _, ev := range wresp.Events {
			fmt.Printf("Type: %s Key:%s Value:%s\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
		}
	}
}

安装报错:

go: finding github.com/coreos/pkg latest
# github.com/coreos/etcd/clientv3/balancer/resolver/endpoint
E:\DEV\Go\pkg\mod\github.com\coreos\etcd@v3.3.19+incompatible\clientv3\balancer\resolver\endpoint\endpoint.go:114:78: undefined: resolver.BuildOption
E:\DEV\Go\pkg\mod\github.com\coreos\etcd@v3.3.19+incompatible\clientv3\balancer\resolver\endpoint\endpoint.go:182:31: undefined: resolver.ResolveNowOption
# github.com/coreos/etcd/clientv3/balancer/picker
E:\DEV\Go\pkg\mod\github.com\coreos\etcd@v3.3.19+incompatible\clientv3\balancer\picker\err.go:37:44: undefined: balancer.PickOptions
E:\DEV\Go\pkg\mod\github.com\coreos\etcd@v3.3.19+incompatible\clientv3\balancer\picker\roundrobin_balanced.go:55:54: undefined: balancer.PickOptions

解决: 将go.mod里的prpc改为1.26.0版本

google.golang.org/grpc v1.26.0

以上就是Go操作Kafka和Etcd方法详解的详细内容,更多关于Go操作Kafka Etcd的资料请关注我们其它相关文章!

(0)

相关推荐

  • Go操作etcd的实现示例

    目录 etcdetcd介绍 etcd应用场景 服务发现 配置中心 分布式锁 为什么用 etcd 而不用ZooKeeper? 为什么不选择ZooKeeper? 为什么选择etcd? etcd集群 搭建一个3节点集群示例: Go语言操作etcd 安装 put和get操作 watch操作 基于etcd实现分布式锁 参考链接: etcd是近几年比较火热的一个开源的.分布式的键值对数据存储系统,提供共享配置.服务的注册和发现,本文主要介绍etcd的安装和使用. etcdetcd介绍 etcd是使用Go语言

  • golang如何使用sarama访问kafka

    下面一个客户端代码例子访问kafka服务器,来发送和接受消息. 使用方式 1.命令行参数 $ ./kafkaclient -h Usage of ./client: -ca string CA Certificate (default "ca.pem") -cert string Client Certificate (default "cert.pem") -command string consumer|producer (default "consu

  • Go语言kafka生产消费消息实例搬砖

    目录 kafka go库 注意 例子 kafka go库 kafka go客户端官方目前没有提供,但在github有2个非常流行的库 星星较多,网上案例也多 https://github.com/Shopify/sarama confluent官网提供的库 https://github.com/confluentinc/confluent-kafka-go 这里使用sarama,因为星星多,案例多,方便快速上手 注意 如果kafka版本在2.2以下,需要在go.mod里面将sarama版本改为g

  • golang对etcd存取和数值监测的实现

    测试代码如下 package main import ( "fmt" "log" "time" "go.etcd.io/etcd/clientv3" "golang.org/x/net/context" ) var ( dialTimeout = 5 * time.Second requestTimeout = 2 * time.Second endpoints = []string{"192.1

  • golang连接kafka消费进ES操作

    1.首先初始化conf配置把kafka和ES的地址配置好还有一个日志方便查看 配置信息如下 用到的库是 github.com/astaxie/beego/config [logs] log_level = debug log_path = "./logs/log_transfer.log" [kafka] server_addr = 192.168.0.134:9092 topic = nginx_log [ES] addr = http://192.168.0.134:9200/ 2

  • Golang如何将日志以Json格式输出到Kafka

    目录 格式化接口 普通文本格式化器 Json文本格式化器 写日志接口 写日志到文件 写日志到Kafka 接口的组装 如何提高日志处理的吞吐量 在上一篇文章中我实现了一个支持Debug.Info.Error等多个级别的日志库,并将日志写到了磁盘文件中,代码比较简单,适合练手.有兴趣的可以通过这个链接前往:https://github.com/bosima/ylog/releases/tag/v1.0.1 工程实践中,我们往往还需要对日志进行采集,将日志归集到一起,然后用于各种处理分析,比如生产环境

  • Go+Kafka实现延迟消息的实现示例

    目录 前言 原理 简单的实现 生产者 延迟服务 消费者 改进点 通用的延迟服务 生产者负责延迟服务 总结 前言 延迟队列是一个非常有用的工具,我们经常遇到需要使用延迟队列的场景,比如延迟通知,订单关闭等等. 这篇文章主要是使用Go+Kafka实现延迟消息. 使用了sarama客户端. 原理 Kafka实现延迟消息分为下面三步: 生产者把消息发送到延迟队列 延迟服务把延迟队列里超过延迟时间的消息写入真实队列 消费者消费真实队列里的消息 简单的实现 生产者 生产者只是把消息发送到延迟队列 msg :

  • Golang监听日志文件并发送到kafka中

    目录 前言 涉及的golang库和可视化工具: 工作的流程 环境准备 代码分层 关键的代码 main.go kafka.go tail.go 前言 日志收集项目的准备中,本文主要讲的是利用golang的tail库,监听日志文件的变动,将日志信息发送到kafka中. 涉及的golang库和可视化工具: go-ini,sarama,tail其中: go-ini:用于读取配置文件,统一管理配置项,有利于后其的维护 sarama:是一个go操作kafka的客户端.目前我用于向kefka发送消息 tail

  • Go操作Kafka和Etcd方法详解

    目录 操作Kafka sarama 下载及安装 注意事项 连接 kafka 发送消息 连接 kafka 消费消息 操作Etcd 安装 put和get操作 watch操作 安装报错: 操作Kafka Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据,具有高性能.持久化.多副本备份.横向扩展等特点.本文介绍了如何使用 Go 语言发送和接收 kafka 消息. sarama Go 语言中连接 kafka 使用第三方库:github.com/Shopify

  • NodeJs 文件系统操作模块fs使用方法详解

    NodeJs:文件读取API使用方法 - - readFile() 需求:使用Node中提供的文件操作API,读取files目录下的1.txt 文档中文本内容. Node的三个组成部分:ECMAScript核心 + 全局成员 + 核心API成员 核心API成员,在大家安装Node应用程序的时候,就已经安装到了自己的电脑中. 如果想要访问核心成员,直接使用require("核心成员的名称"),就能够导入并使用这些核心成员. const fs = require("fs"

  • nodejs环境快速操作mysql数据库的方法详解

    github地址https://github.com/dmhsq/dmhsq-mysql-db 可用于腾讯云SCF以及云开发环境 错误处理尚未完善 错误参考mysql错误 引入依赖包 npm install dmhsq-mysql-db 效果如下 简化了mysql的使用 安装依赖 npm install dmhsq-mysql-db 使用示例 快速操作mysql 错误处理尚未完善 部分错误参考mysql错误 引入资源 const database = require("dmhsq-mysql-d

  • Python自动操作Excel文件的方法详解

    目录 工具 读取Excel文件内容 写入Excel文件内容 Excel文件样式调整 设置表头的位置 设置单元格的宽高 总结 工具 python3.7 Pycharm Excel xlwt&xlrd 读取Excel文件内容 当前文件夹下有一个名为“股票数据.xlsx”的Excel文件,可以按照下列代码方式来操作它. import xlrd # 使用xlrd模块的open_workbook函数打开指定Excel文件并获得Book对象(工作簿) wb = xlrd.open_workbook('股票数

  • php操作access数据库的方法详解

    本文实例讲述了php操作access数据库的方法.分享给大家供大家参考,具体如下: 在PHP网站开发中,PHP与Mysql是最好的组合,但是当你想将其他平台的网站移植到PHP平台时,必然遇到移植性的问题,如ASP+ACCESS平台如何移植?首当其冲便是PHP连接Access数据库问题,在不改变数据库的情况下,PHP如何与Access数据库建立连接? PHP提供多种连接数据库解决方案,在此详解如何使用PHP ADOdb.PDO.ODBC与Access数据库建立连接的代码实例,作为抛砖引玉. 准备工

  • Python操作mongodb数据库的方法详解

    本文实例讲述了Python操作mongodb数据库的方法.分享给大家供大家参考,具体如下: 安装pymongo 下载pymongo: https://pypi.python.org/packages/82/26/f45f95841de5164c48e2e03aff7f0702e22cef2336238d212d8f93e91ea8/pymongo-3.4.0.tar.gz#md5=aa77f88e51e281c9f328cea701bb6f3e 安装pymongo: 解压后,cmd进入pymon

  • Python操作SQLite数据库的方法详解

    本文实例讲述了Python操作SQLite数据库的方法.分享给大家供大家参考,具体如下: SQLite简单介绍 SQLite数据库是一款非常小巧的嵌入式开源数据库软件,也就是说没有独立的维护进程,所有的维护都来自于程序本身.它是遵守ACID的关联式数据库管理系统,它的设计目标是嵌入式的,而且目前已经在很多嵌入式产品中使用了它,它占用资源非常的低,在嵌入式设备中,可能只需要几百K的内存就够了.它能够支持Windows/Linux/Unix等等主流的操作系统,同时能够跟很多程序语言相结合,比如 Tc

  • Python操作SQLite数据库的方法详解【导入,创建,游标,增删改查等】

    本文实例讲述了Python操作SQLite数据库的方法.分享给大家供大家参考,具体如下: SQLite简介 SQLite,是一款轻型的数据库,是遵守ACID的关系型数据库管理系统,它包含在一个相对小的C库中.它是D.RichardHipp建立的公有领域项目.它的设计目标是嵌入式的,而且目前已经在很多嵌入式产品中使用了它,它占用资源非常的低,在嵌入式设备中,可能只需要几百K的内存就够了.它能够支持Windows/Linux/Unix等等主流的操作系统,同时能够跟很多程序语言相结合,比如 Tcl.C

  • C#操作注册表的方法详解

    本文实例讲述了C#操作注册表的方法.分享给大家供大家参考,具体如下: 下面我们就来用.NET下托管语言C#注册表操作,主要内容包括:注册表项的创建,打开与删除.键值的创建(设置值.修改),读取和删除.判断注册表项是否存在.判断键值是否存在. 准备工作: 1. 要操作注册表,我们必须要引入必要的命名空间: 复制代码 代码如下: using Microsoft.Win32; 在这个命名空间里面包含了许多注册表相关的类,足够我们使用了~~ 2. 命名空间里面提供了一个类:RegistryKey 利用它

  • PHP实现链式操作的三种方法详解

    本文实例讲述了PHP实现链式操作的三种方法.分享给大家供大家参考,具体如下: 在php中有很多字符串函数,例如要先过滤字符串收尾的空格,再求出其长度,一般的写法是: strlen(trim($str)) 如果要实现类似js中的链式操作,比如像下面这样应该怎么写? $str->trim()->strlen() 下面分别用三种方式来实现: 方法一.使用魔法函数__call结合call_user_func来实现 思想:首先定义一个字符串类StringHelper,构造函数直接赋值value,然后链式

随机推荐