Go语言实现分布式锁

目录
  • 1. go实现分布式锁
    • 1.1 redis_lock.go
    • 1.2 retry.go
    • 1.3 lock.lua
    • 1.4 lua_unlock.lua
    • 1.5 refresh.lua
    • 1.6 单元测试

1. go实现分布式锁

通过 golang 实现一个简单的分布式锁,包括锁续约、重试机制、singleflght机制的使用

1.1 redis_lock.go

package redis_lock
import (
	"context"
	_ "embed"
	"errors"
	"github.com/go-redis/redis/v9"
	"github.com/google/uuid"
	"golang.org/x/sync/singleflight"
	"time"
)
// go:embed 可以直接解析出文件中的字符串
var (
	//go:embed lua_unlock.lua
	luaUnlock string
	//go:embed refresh.lua
	luaRefresh string
	//go:embed lock.lua
	luaLock string
	//定义好两个异常信息
	ErrLockNotHold         = errors.New("未持有锁")
	ErrFailedToPreemptLock = errors.New("加锁失败")
)
type Client struct {
	//采用公共的接口,后续实例通过传入的方式
	client redis.Cmdable
	// singleflight 用于在一个实例的多个携程中只需要竞争出一个携程
	s singleflight.Group
}
func NewClient(c redis.Cmdable) *Client {
	return &Client{
		client: c,
	}
}
func (c *Client) SingleflightLock(ctx context.Context,
	key string,
	expire time.Duration,
	retry RetryStrategy,
	timeout time.Duration) (*Lock, error) {
	for {
		flag := false
		resCh := c.s.DoChan(key, func() (interface{}, error) {
			flag = true
			return c.Lock(ctx, key, expire, retry, timeout)
		})
		select {
		case res := <-resCh:
			if flag {
				if res.Err != nil {
					return nil, res.Err
				}
				//返回锁对象
				return res.Val.(*Lock), nil
			}
		case <-ctx.Done():
			return nil, ctx.Err()
		}
	}
}
//Lock 加锁方法,根据重试机制进行重新获取
func (c *Client) Lock(ctx context.Context,
	key string,
	expire time.Duration,
	retry RetryStrategy,
	timeout time.Duration) (*Lock, error) {
	var timer *time.Timer
	defer func() {
		if timer != nil {
			timer.Stop()
		}
	}()
	for {
		//设置超时
		lct, cancel := context.WithTimeout(ctx, timeout)
		//获取到uuid
		value := uuid.New().String()
		//执行lua脚本进行加锁
		result, err := c.client.Eval(lct, luaLock, []string{key}, value, expire).Bool()
		//用于主动释放资源
		cancel()
		if err != nil && !errors.Is(err, context.DeadlineExceeded) {
			return nil, err
		}
		if result {
			return newLock(c.client, key, value), nil
		}
		//可以不传重试机制
		if retry != nil {
			//通过重试机制获取重试的策略
			interval, ok := retry.Next()
			if !ok {
				//不用重试
				return nil, ErrFailedToPreemptLock
			}
			if timer == nil {
				timer = time.NewTimer(interval)
			}
			timer.Reset(interval)
			select {
			case <-timer.C: //睡眠时间超时了
				return nil, ctx.Err()
			case <-ctx.Done(): //整个调用的超时
				return nil, ctx.Err()
			}
		}
	}
}
// TryLock 尝试加锁
func (c *Client) TryLock(ctx context.Context, key string, expire time.Duration) (*Lock, error) {
	return c.Lock(ctx, key, expire, nil, 0)
}
// NewLock 创建一个锁结构体
func newLock(client redis.Cmdable, key string, value string) *Lock {
	return &Lock{
		client:     client,
		key:        key,
		value:      value,
		unLockChan: make(chan struct{}, 1), //设置1个缓存数据,用于解锁的信号量
	}
}
// Lock 结构体对象
type Lock struct {
	client redis.Cmdable
	key    string
	value  string
	expire time.Duration
	//在解锁成功之后发送信号来取消续约
	unLockChan chan struct{}
}
// AutoRefresh 自动续约
func (l *Lock) AutoRefresh(interval time.Duration, timeout time.Duration) error {
	//设计一个管道,如果失败了,就发送数据到管道之中,通知进行重试
	retry := make(chan struct{}, 1)
	//方法返回时关闭close
	defer close(retry)
	ticker := time.NewTicker(interval)
	for {
		select {
		//接收到结束的信号时,直接return
		case <-l.unLockChan:
			return nil
		//监听重试的管道
		case <-retry:
			ctx, cancel := context.WithTimeout(context.Background(), timeout)
			err := l.Refresh(ctx)
			//主动调用释放资源
			cancel()
			if err == context.DeadlineExceeded {
				// 执行重试往管道中发送一个信号
				retry <- struct{}{}
				continue
			}
			if err != nil {
				return err
			}
		case <-ticker.C:
			ctx, cancel := context.WithTimeout(context.Background(), timeout)
			err := l.Refresh(ctx)
			//主动调用释放资源
			cancel()
			if err == context.DeadlineExceeded {
				// 执行重试往管道中发送一个信号
				retry <- struct{}{}
				continue
			}
			if err != nil {
				return err
			}
		}
	}
}
// Refresh 续约
func (l *Lock) Refresh(ctx context.Context) error {
	//执行lua脚本,对锁进行续约
	i, err := l.client.Eval(ctx, luaRefresh, []string{l.key}, l.value, l.expire.Milliseconds()).Int64()
	if err == redis.Nil {
		return ErrLockNotHold
	}
	if err != nil {
		return err
	}
	if i == 0 {
		return ErrLockNotHold
	}
	return nil
}
// Unlock 解锁
func (l *Lock) Unlock(ctx context.Context) error {
	//解锁时,退出方法需要发送一个信号让自动续约的goroutine停止
	defer func() {
		l.unLockChan <- struct{}{}
		close(l.unLockChan)
	}()
	//判断返回的结果
	result, err := l.client.Eval(ctx, luaUnlock, []string{l.key}, l.value).Int64()
	if err == redis.Nil {
		return ErrLockNotHold
	}
	if err != nil {
		return err
	}
	//lua脚本返回的结果如果为0,也是代表当前锁不是自己的
	if result == 0 {
		return ErrLockNotHold
	}
	return nil
}

1.2 retry.go

package redis_lock
import "time"
// RetryStrategy 重试策略
type RetryStrategy interface {
	// Next 下一次重试的时间是多久,返回两个参数 time 时间,bool 是否直接重试
	Next() (time.Duration, bool)
}

1.3 lock.lua

lua脚本原子化加锁

--[[ 获取到对应的value是否跟当前的一样 ]]
if redis.call("get", KEYS[1]) == ARGV[1]
then
-- 如果一样直接对其时间进行续约
	return redis.call("pexpire", KEYS[1], ARGV[2])
else
-- 如果不一样调用setnx命令对其进行设置值
	return redis.call("set", KEYS[1], ARGV[1], "NX", "PX", ARGV[2])

1.4 lua_unlock.lua

lua脚本原子化解锁

if redis.call("get", KEYS[1]) == ARGV[1] then
	-- 返回0,代表key不在
	return redis.call("del", KEYS[1])
else
	-- key在,但是值不对
	return 0
end

1.5 refresh.lua

lua脚本续约

if redis.call("get", KEYS[1]) == ARGV[1] then
	-- 返回0,代表key不在
	return redis.call("pexpire", KEYS[1], ARGV[2])
else
	-- key在,但是值不对
	return 0
end

1.6 单元测试

使用go-mock工具生成本地的单元测试,不需要再单独的搭建一个 redis 的服务端

项目根目录下安装mockgen工具

go install github.com/golang/mock/mockgen@latest

添加依赖

go get github.com/golang/mock/mockgen/model

生成redis客户端接口

mockgen -package=mocks -destination=mocks/redis_cmdable.mock.go github.com/go-redis/redis/v9 Cmdable

  • package:指定包
  • destination:生成路径名称
  • 剩下的是指定使用redis包下面的 Cmdable接口生成代码

测试类

func TestClient_TryLock(t *testing.T) {
	ctrl := gomock.NewController(t)
	defer ctrl.Finish()
	testCase := []struct {
		//测试的场景
		name string
		//输入
		key        string
		expiration time.Duration
		//返回一个mock数据
		mock func() redis.Cmdable
		//期望的返回的错误值
		wantError error
		//期望返回的锁
		wantLock *Lock
	}{
		{
			name:       "locked",
			key:        "locked-key",
			expiration: time.Minute,
			mock: func() redis.Cmdable {
				rdb := mocks.NewMockCmdable(ctrl)
				res := redis.NewBoolResult(true, nil)
				i := []interface{}{gomock.Any(), time.Minute}
				rdb.EXPECT().Eval(gomock.Any(), luaLock, []string{"locked-key"}, i...).Return(res)
				return rdb
			},
			wantLock: &Lock{
				key: "locked-key",
			},
		},
	}
	for _, tc := range testCase {
		t.Run(tc.name, func(t *testing.T) {
			var c = NewClient(tc.mock())
			l, err := c.TryLock(context.Background(), tc.key, tc.expiration)
			assert.Equal(t, tc.wantError, err)
			if err != nil {
				return
			}
			//判断返回的key是否跟期望的一样
			assert.Equal(t, tc.key, l.key)
			assert.Equal(t, tc.wantLock.key, l.key)
			assert.NotEmpty(t, l.value)
		})
	}
}

到此这篇关于Go语言实现分布式锁的文章就介绍到这了,更多相关Go分布式锁内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • GoLang分布式锁与snowflake雪花算法

    目录 分布式id生成器 分布式锁 负载均衡 go语言在网络服务模块有着得天独厚的优势:传送门详细介绍了涉及到的分布式相关技术. 分布式id生成器 Snowflake(雪花算法),由Twitter提出并开源,可在分布式环境下用于生成唯一ID的算法. 生成的Id是64位(int64)数值类型,包含4部分: 41bit的时间戳(毫秒):一般是相对系统上线时间的毫秒数(可用69年): 5bit的数据中心id+5bit的机器id:表示工作的计算机:实际使用时可根据情况调整两者间的比例: 12bit序列号:

  • Golang分布式锁简单案例实现流程

    其实锁这种东西,都能能不加就不加,锁会导致程序一定程度上退回到串行化,进而降低效率. 首先,看一个案例,如果要实现一个计数器,并且是多个协程共同进行的,就会出现以下的情况: package main import ( "fmt" "sync" ) func main() { numberFlag := 0 wg := new(sync.WaitGroup) for i := 0; i < 200; i++ { wg.Add(1) go func() { def

  • 详解一种用django_cache实现分布式锁的方式

    问题背景 在项目开发过程中,我遇到一个需求:对于某条记录,一个用户对它进行操作时会持续比较久,希望在一个用户的操作期间,不允许有另一个用户操作它,否容易会出现混乱. 在与同事们讨论后,想通过加锁的方式,起初想用redis锁,但这样会为项目增加别的依赖,因此转而使用django-cache的缓存数据库,来实现该功能. 资料查找 基于缓存实现分布式锁,在网络上查找了实现方式,大概可以总结为以下3种: 第一种锁命令INCR 这种加锁的思路是, key 不存在,那么 key 的值会先被初始化为 0 ,然

  • 用Go+Redis实现分布式锁的示例代码

    目录 为什么需要分布式锁 分布式锁需要具备特性 实现 Redis 锁应先掌握哪些知识点 set 命令 Redis.lua 脚本 go-zero 分布式锁 RedisLock 源码分析 关于分布式锁还有哪些实现方案 项目地址 为什么需要分布式锁 用户下单 锁住 uid,防止重复下单. 库存扣减 锁住库存,防止超卖. 余额扣减 锁住账户,防止并发操作. 分布式系统中共享同一个资源时往往需要分布式锁来保证变更资源一致性. 分布式锁需要具备特性 排他性 锁的基本特性,并且只能被第一个持有者持有. 防死锁

  • Golang分布式锁详细介绍

    目录 进程内加锁 trylock 基于redis的setnx 基于zk 基于etcd redlock 如何选择 在单机程序并发或并行修改全局变量时,需要对修改行为加锁以创造临界区.为什么需要加锁呢?可以看看下段代码: package main import ( "sync" ) // 全局变量 var counter int func main() { var wg sync.WaitGroup for i := 0; i < 1000; i++ { wg.Add(1) go f

  • go如何利用orm简单实现接口分布式锁

    在开发中有些敏感接口,例如用户余额提现接口,需要考虑在并发情况下接口是否会发生问题.如果用户将自己的多条提现请求同时发送到服务器,代码能否扛得住呢?一旦没做锁,那么就真的会给用户多次提现,给公司带来损失.我来简单介绍一下在这种接口开发过程中,我的做法. 第一阶段: 我们使用的orm为xorm,提现表对应的结构体如下 type Participating struct { ID uint `xorm:"autoincr id" json:"id,omitempty"`

  • Go结合Redis用最简单的方式实现分布式锁

    目录 前言 单Redis实例场景 加解锁示例 小结 多Redis实例场景 加解锁示例 小结 总结 前言 在项目中我们经常有需要使用分布式锁的场景,而Redis是实现分布式锁最常见的一种方式,并且我们也都希望能够把代码写得简单一点,所以今天我们尽量用最简单的方式来实现. 下面的代码使用go-redis客户端和gofakeit,参考和引用了Redis官方文章 单Redis实例场景 如果熟悉Redis的命令,可能会马上想到使用Redis的set if not exists操作来实现,并且现在标准的实现

  • go 分布式锁简单实现实例详解

    目录 正文 案例 资源加锁 使用redis来实现分布式锁 redis lua保证原子性 正文 其实锁这种东西,都能能不加就不加,锁会导致程序一定程度上退回到串行化,进而降低效率. 案例 首先,看一个案例,如果要实现一个计数器,并且是多个协程共同进行的,就会出现以下的情况: package main import ( "fmt" "sync" ) func main() { numberFlag := 0 wg := new(sync.WaitGroup) for i

  • Go 语言下基于Redis分布式锁的实现方式

    分布式锁一般有三种实现方式:1. 数据库乐观锁:2. 基于Redis的分布式锁:3. 基于ZooKeeper的分布式锁.本篇博客将介绍第二种方式,基于Redis实现分布式锁.虽然网上已经有各种介绍Redis分布式锁实现的博客,然而他们的实现却有着各种各样的问题,为了避免误人子弟,本篇博客将详细介绍如何正确地实现Redis分布式锁. 项目地址: https://github.com/Spongecaptain/redisLock 1. Go 原生的互斥锁 Go 原生的互斥锁即 sync 包下的 M

  • Go语言实现分布式锁

    目录 1. go实现分布式锁 1.1 redis_lock.go 1.2 retry.go 1.3 lock.lua 1.4 lua_unlock.lua 1.5 refresh.lua 1.6 单元测试 1. go实现分布式锁 通过 golang 实现一个简单的分布式锁,包括锁续约.重试机制.singleflght机制的使用 1.1 redis_lock.go package redis_lock import ( "context" _ "embed" &quo

  • java语言描述Redis分布式锁的正确实现方式

    分布式锁一般有三种实现方式:1.数据库乐观锁:2.基于Redis的分布式锁:3.基于ZooKeeper的分布式锁.本篇博客将介绍第二种方式,基于Redis实现分布式锁.虽然网上已经有各种介绍Redis分布式锁实现的博客,然而他们的实现却有着各种各样的问题,为了避免误人子弟,本篇博客将详细介绍如何正确地实现Redis分布式锁. 可靠性 首先,为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件: 互斥性.在任意时刻,只有一个客户端能持有锁. 不会发生死锁.即使有一个客户端在持有锁的期间

  • 基于Redis实现分布式锁以及任务队列

    一.前言 双十一刚过不久,大家都知道在天猫.京东.苏宁等等电商网站上有很多秒杀活动,例如在某一个时刻抢购一个原价1999现在秒杀价只要999的手机时,会迎来一个用户请求的高峰期,可能会有几十万几百万的并发量,来抢这个手机,在高并发的情形下会对数据库服务器或者是文件服务器应用服务器造成巨大的压力,严重时说不定就宕机了,另一个问题是,秒杀的东西都是有量的,例如一款手机只有10台的量秒杀,那么,在高并发的情况下,成千上万条数据更新数据库(例如10台的量被人抢一台就会在数据集某些记录下 减1),那次这个

  • 浅谈Redis分布式锁的正确实现方式

    前言 分布式锁一般有三种实现方式:1. 数据库乐观锁:2. 基于Redis的分布式锁:3. 基于ZooKeeper的分布式锁.本篇博客将介绍第二种方式,基于Redis实现分布式锁.虽然网上已经有各种介绍Redis分布式锁实现的博客,然而他们的实现却有着各种各样的问题,为了避免误人子弟,本篇博客将详细介绍如何正确地实现Redis分布式锁. 可靠性 首先,为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件: 1.互斥性.在任意时刻,只有一个客户端能持有锁. 2.不会发生死锁.即使有一个

  • Java Redis分布式锁的正确实现方式详解

    前言 分布式锁一般有三种实现方式:1. 数据库乐观锁:2. 基于Redis的分布式锁:3. 基于ZooKeeper的分布式锁.本篇博客将介绍第二种方式,基于Redis实现分布式锁.虽然网上已经有各种介绍Redis分布式锁实现的博客,然而他们的实现却有着各种各样的问题,为了避免误人子弟,本篇博客将详细介绍如何正确地实现Redis分布式锁. 可靠性 首先,为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件: 互斥性.在任意时刻,只有一个客户端能持有锁. 不会发生死锁.即使有一个客户端在

  • Python使用分布式锁的代码演示示例

    在计算机并发领域编程中总是会与锁打交道,锁又有很多种,互斥锁.自旋锁等等. 锁总是伴随着线程.进程这样的词汇出现,阮一峰有 一篇文章 对这些名词进行了简单易懂的解释. 我的理解是,使用线程.进程是为了实现并发从而获得性能的提升(利用多核CPU,多台服务器),但这种并发由于调度的不确定性,很容易出乱子,为了(在一些共享资源.关键节点上)不出乱子,又需要对资源加锁,在操作这个资源时控制这种并发,将乱子消灭. 很多语言都提供了一些线程级别的锁实现以及一些相应的工具,但在进程方面就无能为力了.而一个服务

  • Redis分布式锁的正确实现方法总结

    分布式锁一般有三种实现方式: 1.数据库乐观锁: 2.基于Redis的分布式锁: 3.基于ZooKeeper的分布式锁. 本文将介绍第二种方式,基于Redis实现分布式锁.虽然网上已经有各种介绍Redis分布式锁实现的博客,然而他们的实现却有着各种各样的问题,为了避免误人子弟,本篇博客将详细介绍如何正确地实现Redis分布式锁. 可靠性 首先,为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件: 1.互斥性.在任意时刻,只有一个客户端能持有锁. 2.不会发生死锁.即使有一个客户端在

  • Springboot中如何使用Redisson实现分布式锁浅析

    目录 前言 1. 概述 2. Redisson 在 Springboot 中的使用 2.1 引入依赖 2.2 在 Springboot 配置中配置Redis 2.3 Demo代码 3. 综述 前言 在分布式场景下为了保证数据最终一致性.在单进程的系统中,存在多个线程可以同时改变某个变量(可变共享变量)时,就需要对变量或代码块做同步(lock-synchronized),使其在修改这种变量时能够线性执行消除并发修改变量.但分布式系统是多部署.多进程的,开发语言提供的并发处理API在此场景下就无能为

  • 带你轻松掌握Redis分布式锁

    目录 1. 什么是分布式锁 2. 分布式锁该具备的特性 3. 基于数据库做分布式锁 4. 基于Redis做分布式锁 4.1 超时问题 4.2 可重入锁 4.3 集群环境的缺陷 4.4 Redlock 目前很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题. 基于 CAP理论,任何一个分布式系统都无法同时满足一致性(Consistency).可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项. 我们为

随机推荐