Golang分布式应用之Redis示例详解

目录
  • 正文
  • 分布式锁
    • 运行测试
  • 分布式过滤器
    • 运行测试
  • 分布式限流器
    • 运行测试
  • 其他

正文

Redis作是一个高性能的内存数据库,常被应用于分布式系统中,除了作为分布式缓存或简单的内存数据库还有一些特殊的应用场景,本文结合Golang来编写对应的中间件。

本文所有代码见github.com/qingwave/go…

分布式锁

单机系统中我们可以使用sync.Mutex来保护临界资源,在分布式系统中同样有这样的需求,当多个主机抢占同一个资源,需要加对应的“分布式锁”。

在Redis中我们可以通过setnx命令来实现

  • 如果key不存在可以设置对应的值,设置成功则加锁成功,key不存在返回失败
  • 释放锁可以通过del实现。

主要逻辑如下:

type RedisLock struct {
	client     *redis.Client
	key        string
	expiration time.Duration // 过期时间,防止宕机或者异常
}
func NewLock(client *redis.Client, key string, expiration time.Duration) *RedisLock {
	return &RedisLock{
		client:     client,
		key:        key,
		expiration: expiration,
	}
}
// 加锁将成功会将调用者id保存到redis中
func (l *RedisLock) Lock(id string) (bool, error) {
	return l.client.SetNX(context.TODO(), l.key, id, l.expiration).Result()
}
const unLockScript = `
if (redis.call("get", KEYS[1]) == KEYS[2]) then
	redis.call("del", KEYS[1])
	return true
end
return false
`
// 解锁通过lua脚本来保证原子性,只能解锁当前调用者加的锁
func (l *RedisLock) UnLock(id string) error {
	_, err := l.client.Eval(context.TODO(), unLockScript, []string{l.key, id}).Result()
	if err != nil && err != redis.Nil {
		return err
	}
	return nil
}

需要加一个额外的超时时间来防止系统宕机或者异常请求造成的死锁,通过超时时间为最大预估运行时间的2倍。

解锁时通过lua脚本来保证原子性,调用者只会解自己加的锁。避免由于超时造成的混乱,例如:进程A在时间t1获取了锁,但由于执行缓慢,在时间t2锁超时失效,进程B在t3获取了锁,这是如果进程A执行完去解锁会取消进程B的锁。

运行测试

func main() {
    client := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "123456",
		DB:       0, // use default DB
	})
	lock := NewLock(client, "counter", 30*time.Second)
    counter := 0
	worker := func(i int) {
		for {
			id := fmt.Sprintf("worker%d", i)
			ok, err := lock.Lock(id)
			log.Printf("worker %d attempt to obtain lock, ok: %v, err: %v", i, ok, err)
			if !ok {
				time.Sleep(100 * time.Millisecond)
				continue
			}
			defer lock.UnLock(id)
			counter++
			log.Printf("worker %d, add counter %d", i, counter)
			break
		}
	}
	wg := sync.WaitGroup{}
	for i := 1; i <= 5; i++ {
		wg.Add(1)
		id := i
		go func() {
			defer wg.Done()
			worker(id)
		}()
	}
	wg.Wait()
}

运行结果,可以看到与sync.Mutex使用效果类似

2022/07/22 09:58:09 worker 5 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:09 worker 5, add counter 1
2022/07/22 09:58:09 worker 4 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:09 worker 1 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:09 worker 2 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:09 worker 3 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 1 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 4 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:10 worker 4, add counter 2
2022/07/22 09:58:10 worker 1 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:10 worker 1, add counter 3
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:10 worker 2, add counter 4
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:10 worker 3, add counter 5

特别注意的是,在分布式Redis集群中,如果发生异常时(主节点宕机),可能会降低分布式锁的可用性,可以通过强一致性的组件etcd、ZooKeeper等实现。

分布式过滤器

假设要开发一个爬虫服务,爬取百万级的网页,怎么判断某一个网页是否爬取过,除了借助数据库和HashMap,我们可以借助布隆过滤器来做。相比其他方式布隆过滤器占用极低的空间,而且插入查询时间非常快。

布隆过滤器用来判断某个元素是否在集合中,利用BitSet

  • 插入数据时将值进行多次Hash,将BitSet对应位置1
  • 查询时同样进行多次Hash对比所有位上是否为1,如是则存在。

布隆过滤器有一定的误判率,不适合精确查询的场景。另外也不支持删除元素。通常适用于URL去重、垃圾邮件过滤、防止缓存击穿等场景中。

在Redis中,我们可以使用自带的BitSet实现,同样也借助lua脚本的原子性来避免多次查询数据不一致。

const (
	// 插入数据,调用setbit设置对应位
	setScript = `
for _, offset in ipairs(ARGV) do
	redis.call("setbit", KEYS[1], offset, 1)
end
`
	// 查询数据,如果所有位都为1返回true
	getScript = `
for _, offset in ipairs(ARGV) do
	if tonumber(redis.call("getbit", KEYS[1], offset)) == 0 then
		return false
	end
end
return true
`
)
type BloomFilter struct {
	client *redis.Client
	key    string // 存在redis中的key
	bits   uint // BitSet的大小
	maps   uint // Hash的次数
}
func NewBloomFilter(client *redis.Client, key string, bits, maps uint) *BloomFilter {
	client.Del(context.TODO(), key)
	if maps == 0 {
		maps = 14
	}
	return &BloomFilter{
		key:    key,
		client: client,
		bits:   bits,
		maps:   maps,
	}
}
// 进行多次Hash, 得到位置列表
func (f *BloomFilter) getLocations(data []byte) []uint {
	locations := make([]uint, f.maps)
	for i := 0; i < int(f.maps); i++ {
		val := murmur3.Sum64(append(data, byte(i)))
		locations[i] = uint(val) % f.bits
	}
	return locations
}
func (f *BloomFilter) Add(data []byte) error {
	args := getArgs(f.getLocations(data))
	_, err := f.client.Eval(context.TODO(), setScript, []string{f.key}, args).Result()
	if err != nil && err != redis.Nil {
		return err
	}
	return nil
}
func (f *BloomFilter) Exists(data []byte) (bool, error) {
	args := getArgs(f.getLocations(data))
	resp, err := f.client.Eval(context.TODO(), getScript, []string{f.key}, args).Result()
	if err != nil {
		if err == redis.Nil {
			return false, nil
		}
		return false, err
	}
	exists, ok := resp.(int64)
	if !ok {
		return false, nil
	}
	return exists == 1, nil
}
func getArgs(locations []uint) []string {
	args := make([]string, 0)
	for _, l := range locations {
		args = append(args, strconv.FormatUint(uint64(l), 10))
	}
	return args
}

运行测试

func main() {
	bf := NewBloomFilter(client,"bf-test", 2^16, 14)
	exists, err := bf.Exists([]byte("test1"))
	log.Printf("exist %t, err %v", exists, err)
	if err := bf.Add([]byte("test1")); err != nil {
		log.Printf("add err: %v", err)
	}
	exists, err = bf.Exists([]byte("test1"))
	log.Printf("exist %t, err %v", exists, err)
	exists, err = bf.Exists([]byte("test2"))
	log.Printf("exist %t, err %v", exists, err)
// output
// 2022/07/22 10:05:58 exist false, err <nil>
// 2022/07/22 10:05:58 exist true, err <nil>
// 2022/07/22 10:05:58 exist false, err <nil>
}

分布式限流器

golang.org/x/time/rate包中提供了基于令牌桶的限流器,如果要实现分布式环境的限流可以基于Redis Lua脚本实现。

令牌桶的主要原理如下:

  • 假设一个令牌桶容量为burst,每秒按照qps的速率往里面放置令牌
  • 初始时放满令牌,令牌溢出则直接丢弃,请求令牌时,如果桶中有足够令牌则允许,否则拒绝
  • 当burst==qps时,严格按照qps限流;当burst>qps时,可以允许一定的突增流量

这里主要参考了官方rate包的实现,将核心逻辑改为Lua实现。

--- 相关Key
--- limit rate key值,对应value为当前令牌数
local limit_key = KEYS[1]
--- 输入参数
--[[
qps: 每秒请求数;
burst: 令牌桶容量;
now: 当前Timestamp;
cost: 请求令牌数;
max_wait: 最大等待时间
--]]
local qps = tonumber(ARGV[1])
local burst = tonumber(ARGV[2])
local now = ARGV[3]
local cost = tonumber(ARGV[4])
local max_wait = tonumber(ARGV[5])
--- 获取redis中的令牌数
local tokens = redis.call("hget", limit_key, "token")
if not tokens then
	tokens = burst
end
--- 上次修改时间
local last_time = redis.call("hget", limit_key, "last_time")
if not last_time then
	last_time = 0
end
--- 最新等待时间
local last_event = redis.call("hget", limit_key, "last_event")
if not last_event then
	last_event = 0
end
--- 通过当前时间与上次修改时间的差值,qps计算出当前时间得令牌数
local delta = math.max(0, now-last_time)
local new_tokens = math.min(burst, delta * qps + tokens)
new_tokens = new_tokens - cost --- 最新令牌数,减少请求令牌
--- 如果最新令牌数小于0,计算需要等待的时间
local wait_period = 0
if new_tokens < 0 and qps > 0 then
	wait_period = wait_period - new_tokens / qps
end
wait_period = math.ceil(wait_period)
local time_act = now + wait_period --- 满足等待间隔的时间戳
--- 允许请求有两种情况
--- 当请求令牌数小于burst, 等待时间不超过最大等待时间,可以通过补充令牌满足请求
--- qps为0时,只要最新令牌数不小于0即可
local ok = (cost <= burst and wait_period <= max_wait and qps > 0) or (qps == 0 and new_tokens >= 0)
--- 设置对应值
if ok then
	redis.call("set", limit_key, new_tokens)
	redis.call("set", last_time_key, now)
	redis.call("set", last_event_key, time_act)
end
--- 返回列表,{是否允许, 等待时间}
return {ok, wait_period}

在Golang中的相关接口Allow、AllowN、Wait等都是通过调用reserveN实现

// 调用lua脚本
func (lim *RedisLimiter) reserveN(now time.Time, n int, maxFutureReserveSecond int) (*Reservation, error) {
	// ...
	res, err := lim.rdb.Eval(context.TODO(), reserveNScript, []string{lim.limitKey}, lim.qps, lim.burst, now.Unix(), n, maxFutureReserveSecond).Result()
	if err != nil && err != redis.Nil {
		return nil, err
	}
	//...
	return &Reservation{
		ok:        allow == 1,
		lim:       lim,
		tokens:    n,
		timeToAct: now.Add(time.Duration(wait) * time.Second),
	}, nil
}

运行测试

func main() {
	rdb := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "123456",
		DB:       0, // use default DB
	})
	r, err := NewRedisLimiter(rdb, 1, 2, "testrate")
	if err != nil {
		log.Fatal(err)
	}
	r.Reset()
	for i := 0; i < 5; i++ {
		err := r.Wait(context.TODO())
		log.Printf("worker %d allowed: %v", i, err)
	}
}
// output
// 2022/07/22 12:50:31 worker 0 allowed: <nil>
// 2022/07/22 12:50:31 worker 1 allowed: <nil>
// 2022/07/22 12:50:32 worker 2 allowed: <nil>
// 2022/07/22 12:50:33 worker 3 allowed: <nil>
// 2022/07/22 12:50:34 worker 4 allowed: <nil>

前两个请求在burst内,直接可以获得,后面的请求按照qps的速率生成。

其他

除此之外,Redis还可以用作全局计数、去重(set)、发布订阅等场景。Redis官方也提供了一些通用模块,通过加载这些模块也可以实现过滤、限流等特性,参考modules

参考

https://go-zero.dev/

https://github.com/qingwave/gocorex

以上就是Golang分布式应用之Redis示例详解的详细内容,更多关于Go分布式Redis的资料请关注我们其它相关文章!

(0)

相关推荐

  • redis分布式锁的go-redis实现方法详解

    在分布式的业务中 , 如果有的共享资源需要安全的被访问和处理 , 那就需要分布式锁 分布式锁的几个原则; 1.「锁的互斥性」:在分布式集群应用中,共享资源的锁在同一时间只能被一个对象获取. 2. 「可重入」:为了避免死锁,这把锁是可以重入的,并且可以设置超时. 3. 「高效的加锁和解锁」:能够高效的加锁和解锁,获取锁和释放锁的性能也好. 4. 「阻塞.公平」:可以根据业务的需要,考虑是使用阻塞.还是非阻塞,公平还是非公平的锁. redis实现分布式锁主要靠setnx命令 1. 当key存在时失败

  • Go 语言下基于Redis分布式锁的实现方式

    分布式锁一般有三种实现方式:1. 数据库乐观锁:2. 基于Redis的分布式锁:3. 基于ZooKeeper的分布式锁.本篇博客将介绍第二种方式,基于Redis实现分布式锁.虽然网上已经有各种介绍Redis分布式锁实现的博客,然而他们的实现却有着各种各样的问题,为了避免误人子弟,本篇博客将详细介绍如何正确地实现Redis分布式锁. 项目地址: https://github.com/Spongecaptain/redisLock 1. Go 原生的互斥锁 Go 原生的互斥锁即 sync 包下的 M

  • 用Go+Redis实现分布式锁的示例代码

    目录 为什么需要分布式锁 分布式锁需要具备特性 实现 Redis 锁应先掌握哪些知识点 set 命令 Redis.lua 脚本 go-zero 分布式锁 RedisLock 源码分析 关于分布式锁还有哪些实现方案 项目地址 为什么需要分布式锁 用户下单 锁住 uid,防止重复下单. 库存扣减 锁住库存,防止超卖. 余额扣减 锁住账户,防止并发操作. 分布式系统中共享同一个资源时往往需要分布式锁来保证变更资源一致性. 分布式锁需要具备特性 排他性 锁的基本特性,并且只能被第一个持有者持有. 防死锁

  • Golang分布式应用定时任务示例详解

    目录 正文 最小堆 时间轮 总结 正文 在系统开发中,有一类任务不是立即执行,而是在未来某个时间点或者按照一定间隔去执行,比如日志定期压缩.报表制作.过期数据清理等,这就是定时任务. 在单机中,定时任务通常需要实现一个类似crontab的系统,一般有两种方式: 最小堆,按照任务执行时间建堆,每次取最近的任务执行 时间轮,将任务放到时间轮列表中,每次转动取对应的任务列表执行 最小堆 最小堆是一种特殊的完全二叉树,任意非叶子节点的值不大于其子节点,如图 通过最小堆,根据任务最近执行时间键堆,每次取堆

  • Go结合Redis用最简单的方式实现分布式锁

    目录 前言 单Redis实例场景 加解锁示例 小结 多Redis实例场景 加解锁示例 小结 总结 前言 在项目中我们经常有需要使用分布式锁的场景,而Redis是实现分布式锁最常见的一种方式,并且我们也都希望能够把代码写得简单一点,所以今天我们尽量用最简单的方式来实现. 下面的代码使用go-redis客户端和gofakeit,参考和引用了Redis官方文章 单Redis实例场景 如果熟悉Redis的命令,可能会马上想到使用Redis的set if not exists操作来实现,并且现在标准的实现

  • Golang分布式应用之Redis示例详解

    目录 正文 分布式锁 运行测试 分布式过滤器 运行测试 分布式限流器 运行测试 其他 正文 Redis作是一个高性能的内存数据库,常被应用于分布式系统中,除了作为分布式缓存或简单的内存数据库还有一些特殊的应用场景,本文结合Golang来编写对应的中间件. 本文所有代码见github.com/qingwave/go… 分布式锁 单机系统中我们可以使用sync.Mutex来保护临界资源,在分布式系统中同样有这样的需求,当多个主机抢占同一个资源,需要加对应的“分布式锁”. 在Redis中我们可以通过s

  • Golang信号量设计实现示例详解

    目录 开篇 信号量 semaphore 扩展库实现 Acquire Release TryAcquire 总结 开篇 在我们此前的文章 Golang Mutex 原理解析 中曾提到过,Mutex 的底层结构包含了两个字段,state 和 sema: type Mutex struct { state int32 sema uint32 } state 代表互斥锁的状态,比如是否被锁定: sema 表示信号量,协程阻塞会等待该信号量,解锁的协程释放信号量从而唤醒等待信号量的协程. 这个 sema

  • Golang中的参数传递示例详解

    前言 本文主要给大家介绍了关于Golang参数传递的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧. 关于参数传递,Golang文档中有这么一句: after they are evaluated, the parameters of the call are passed by value to the function and the called function begins execution. 函数调用参数均为值传递,不是指针传递或引用传递.经测试引申出来,

  • Golang字符串变位词示例详解

    实现目标 本文的目标是写出一个函数 anagram(s, t) 去判断两个字符串是否是颠倒字母顺序构成的.下面话不多说了,来一起看看详细的介绍吧. GoLang 实现 func solution(s , t string)bool{ if s == t { return true } length := len(s) if length != len(t) { return false } //' ' 32 --> ~ 126 const MAX_ASCII int= 94 const SPAC

  • Golang 官方依赖注入工具wire示例详解

    目录 依赖注入是什么 开源选型 wire providers injectors 类型区分 总结 依赖注入是什么 Dependency Injection is the idea that your components (usually structs in go) should receive their dependencies when being created. 在 Golang 中,构造一个结构体常见的有两种方式: 在结构体初始化过程中,构建它的依赖: 将依赖作为构造器入参,传入进

  • 基于gin的golang web开发:路由示例详解

    Gin是一个用Golang编写的HTTP网络框架.它的特点是类似于Martini的API,性能更好.在golang web开发领域是一个非常热门的web框架. 启动一个Gin web服务器 使用下面的命令安装Gin go get -u github.com/gin-gonic/gin 在代码里添加依赖 import "github.com/gin-gonic/gin" 快速启动一个Gin服务器的代码如下 package main import "github.com/gin-

  • golang中的三个点 '...'的用法示例详解

    '-' 其实是go的一种语法糖. 它的第一个用法主要是用于函数有多个不定参数的情况,可以接受多个不确定数量的参数. 第二个用法是slice可以被打散进行传递. 下面直接上例子: func test1(args ...string) { //可以接受任意个string参数 for _, v:= range args{ fmt.Println(v) } } func main(){ var strss= []string{ "qwr", "234", "yui

  • golang编程开发使用sort排序示例详解

    golang sort package: https://studygolang.com/articles/3360 sort 操作的对象通常是一个 slice,需要满足三个基本的接口,并且能够使用整数来索引 // A type, typically a collection, that satisfies sort.Interface can be // sorted by the routines in this package. The methods require that the /

  • golang gorm更新日志执行SQL示例详解

    目录 1. 更新日志 1.1. v1.0 1.1.1. 破坏性变更 gorm执行sql 1. 更新日志 1.1. v1.0 1.1.1. 破坏性变更 gorm.Open返回类型为*gorm.DB而不是gorm.DB 更新只会更新更改的字段 大多数应用程序不会受到影响,只有当您更改回调中的更新值(如BeforeSave,BeforeUpdate)时,应该使用scope.SetColumn,例如: func (user *User) BeforeUpdate(scope *gorm.Scope) {

随机推荐