java基于mongodb实现分布式锁的示例代码

目录
  • 原理
  • 实现
  • 使用

原理

通过线程安全findAndModify 实现锁

实现

定义锁存储对象:

/**
 * mongodb 分布式锁
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
@Document(collection = "distributed-lock-doc")
public class LockDocument {
    @Id
    private String id;
    private long expireAt;
    private String token;
}

定义Lock API:

public interface LockService {

    String acquire(String key, long expiration);

    boolean release(String key, String token);

    boolean refresh(String key, String token, long expiration);
}

获取锁:

  @Override
    public String acquire(String key, long expiration) {
        Query query = Query.query(Criteria.where("_id").is(key));
        String token = this.generateToken();
        Update update = new Update()
            .setOnInsert("_id", key)
            .setOnInsert("expireAt", System.currentTimeMillis() + expiration)
            .setOnInsert("token", token);

        FindAndModifyOptions options = new FindAndModifyOptions().upsert(true)
                                                                 .returnNew(true);
        LockDocument doc = mongoTemplate.findAndModify(query, update, options,
                                                       LockDocument.class);
        boolean locked = doc.getToken() != null && doc.getToken().equals(token);

        // 如果已过期
        if (!locked && doc.getExpireAt() < System.currentTimeMillis()) {
            DeleteResult deleted = this.mongoTemplate.remove(
                Query.query(Criteria.where("_id").is(key)
                                    .and("token").is(doc.getToken())
                                    .and("expireAt").is(doc.getExpireAt())),
                LockDocument.class);
            if (deleted.getDeletedCount() >= 1) {
                // 成功释放锁, 再次尝试获取锁
                return this.acquire(key, expiration);
            }
        }

        log.debug("Tried to acquire lock for key {} with token {} . Locked: {}",
                  key, token, locked);
        return locked ? token : null;
    }

原理:

  • 先尝试upsert锁对象,如果成功且token一致,说明拿到锁
  • 否则加锁失败
  • 如果未拿到锁,但是锁已过期,尝试删除锁
    • 如果删除成功,再次尝试拿锁
    • 如果失败,说明锁可能已经续期了

释放和续期锁:

   @Override
   public boolean release(String key, String token) {
       Query query = Query.query(Criteria.where("_id").is(key)
                                         .and("token").is(token));
       DeleteResult deleted = mongoTemplate.remove(query, LockDocument.class);
       boolean released = deleted.getDeletedCount() == 1;
       if (released) {
           log.debug("Remove query successfully affected 1 record for key {} with token {}",
                     key, token);
       } else if (deleted.getDeletedCount() > 0) {
           log.error("Unexpected result from release for key {} with token {}, released {}",
                     key, token, deleted);
       } else {
           log.error("Remove query did not affect any records for key {} with token {}",
                     key, token);
       }

       return released;
   }

   @Override
   public boolean refresh(String key, String token,
                          long expiration) {
       Query query = Query.query(Criteria.where("_id").is(key)
                                         .and("token").is(token));
       Update update = Update.update("expireAt",
                                     System.currentTimeMillis() + expiration);
       UpdateResult updated =
           mongoTemplate.updateFirst(query, update, LockDocument.class);

       final boolean refreshed = updated.getModifiedCount() == 1;
       if (refreshed) {
           log.debug("Refresh query successfully affected 1 record for key {} " +
                     "with token {}", key, token);
       } else if (updated.getModifiedCount() > 0) {
           log.error("Unexpected result from refresh for key {} with token {}, " +
                     "released {}", key, token, updated);
       } else {
           log.warn("Refresh query did not affect any records for key {} with token {}. " +
                    "This is possible when refresh interval fires for the final time " +
                    "after the lock has been released",
                    key, token);
       }

       return refreshed;
   }

使用

private  LockService lockService;

private void tryAcquireLockAndSchedule() {
        while (!this.stopSchedule) {
            // 尝试拿锁
            this.token = this.lockService.acquire(SCHEDULER_LOCK, 20000);
            if (this.token != null) {
    // 拿到锁
            } else {
                // 等待LOCK_EXPIRATION, 再次尝试
                Thread.sleep(LOCK_EXPIRATION);
            }
        }
    }
  • 先尝试拿锁,如果获取到token,说明拿锁成功
  • 否则可以sleep一段时间后再拿锁

完整代码,可到github查看 https://github.com/jadepeng/docker-pipeline/blob/main/pipeline-master/src/main/java/com/github/jadepeng/pipeline/service/impl/MongoLockService.java

到此这篇关于java基于mongodb实现分布式锁的示例代码的文章就介绍到这了,更多相关java mongodb实现分布式锁内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java redisson实现分布式锁原理详解

    Redisson分布式锁 之前的基于注解的锁有一种锁是基本redis的分布式锁,锁的实现我是基于redisson组件提供的RLock,这篇来看看redisson是如何实现锁的. 不同版本实现锁的机制并不相同 引用的redisson最近发布的版本3.2.3,不同的版本可能实现锁的机制并不相同,早期版本好像是采用简单的setnx,getset等常规命令来配置完成,而后期由于redis支持了脚本Lua变更了实现原理. <dependency> <groupId>org.redisson&

  • Java编程redisson实现分布式锁代码示例

    最近由于工作很忙,很长时间没有更新博客了,今天为大家带来一篇有关Redisson实现分布式锁的文章,好了,不多说了,直接进入主题. 1. 可重入锁(Reentrant Lock) Redisson的分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口,同时还支持自动过期解锁. public void testReentrantLock(RedissonClient redisson){ RLock lock = redisson.getL

  • redis中使用java脚本实现分布式锁

    redis被大量用在分布式的环境中,自然而然分布式环境下的锁如何解决,立马成为一个问题.例如我们当前的手游项目,服务器端是按业务模块划分服务器的,有应用服,战斗服等,但是这两个vm都有可能同时改变玩家的属性,这如果在同一个vm下面,就很容易加锁,但如果在分布式环境下就没那么容易了,当然利用redis现有的功能也有解决办法,比如redis的脚本. redis在2.6以后的版本中增加了Lua脚本的功能,可以通过eval命令,直接在RedisServer环境中执行Lua脚本,并且可以在Lua脚本中调用

  • java使用zookeeper实现的分布式锁示例

    使用zookeeper实现的分布式锁 分布式锁,实现了Lock接口 复制代码 代码如下: package com.concurrent; import java.io.IOException;import java.util.ArrayList;import java.util.Collections;import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeU

  • Java分布式锁的三种实现方案

    方案一:数据库乐观锁 乐观锁通常实现基于数据版本(version)的记录机制实现的,比如有一张红包表(t_bonus),有一个字段(left_count)记录礼物的剩余个数,用户每领取一个奖品,对应的left_count减1,在并发的情况下如何要保证left_count不为负数,乐观锁的实现方式为在红包表上添加一个版本号字段(version),默认为0. 异常实现流程 -- 可能会发生的异常情况 -- 线程1查询,当前left_count为1,则有记录 select * from t_bonus

  • Java基于redis实现分布式锁代码实例

    为什么会有这个需求: 例如一个简单用户的操作,一个线程去修改用户状态,首先在在内存中读出用户的状态,然后在内存中进行修改,然后在存到数据库中.在单线程中,这是没有问题的.但是在多线程中由于读取,修改,写入是三个操作,不是原子操作(同时成功或失败),因此在多线程中会存在数据的安全性问题. 这个问题的话,就可以用分布式锁在限制程序的并发执行. 实现思路: 就是进来一个先占位,当别的线程进来操作的时候,发现有人占位了,就会放弃或者稍后再试. 占位的实现: 在redis中的setnx命令来实现,redi

  • 详解Java如何实现基于Redis的分布式锁

    前言 单JVM内同步好办, 直接用JDK提供的锁就可以了,但是跨进程同步靠这个肯定是不可能的,这种情况下肯定要借助第三方,我这里实现用Redis,当然还有很多其他的实现方式.其实基于Redis实现的原理还算比较简单的,在看代码之前建议大家先去看看原理,看懂了之后看代码应该就容易理解了. 我这里不实现JDK的java.util.concurrent.locks.Lock接口,而是自定义一个,因为JDK的有个newCondition方法我这里暂时没实现.这个Lock提供了5个lock方法的变体,可以

  • java基于mongodb实现分布式锁的示例代码

    目录 原理 实现 使用 原理 通过线程安全findAndModify 实现锁 实现 定义锁存储对象: /** * mongodb 分布式锁 */ @Data @NoArgsConstructor @AllArgsConstructor @Document(collection = "distributed-lock-doc") public class LockDocument { @Id private String id; private long expireAt; privat

  • java基于jedisLock—redis分布式锁实现示例代码

    分布式锁是啥? 单机锁的概念:我们正常跑的单机项目(也就是在tomcat下跑一个项目不配置集群)想要在高并发的时候加锁很容易就可以搞定,java提供了很多的机制例如:synchronized.volatile.ReentrantLock等锁的机制. 为啥需要分布式锁:当我们的项目比较庞大的时候,单机版的项目已经不能满足吞吐量的需求了,需要对项目做负载均衡,有可能还需要对项目进行解耦拆分成不同的服务,那么肯定是做成分布式的项目,分布式的项目因为是不同的程序控制,所以使用java提供的锁并不能完全保

  • Java基于redis实现分布式锁

    为了保证一个在高并发存场景下只能被同一个线程操作,java并发处理提供ReentrantLock或Synchronized进行互斥控制.但是这仅仅对单机环境有效.我们实现分布式锁大概通过三种方式. redis实现分布式锁 数据库实现分布式锁 zk实现分布式锁 实际上这三种和java对比看属于一类.都是属于程序外部锁. 原理剖析 上述三种分布式锁都是通过各自为依据对各个请求进行上锁,解锁从而控制放行还是拒绝.redis锁是基于其提供的setnx命令. setnx当且仅当key不存在.若给定key已

  • SpringBoot整合Redis正确的实现分布式锁的示例代码

    前言 最近在做分块上传的业务,使用到了Redis来维护上传过程中的分块编号. 每上传完成一个分块就获取一下文件的分块集合,加入新上传的编号,手动接口测试下是没有问题的,前端通过并发上传调用就出现问题了,并发的get再set,就会存在覆盖写现象,导致最后的分块数据不对,不能触发分块合并请求. 遇到并发二话不说先上锁,针对执行代码块加了一个JVM锁之后问题就解决了. 仔细一想还是不太对,项目是分布式部署的,做了负载均衡,一个节点的代码被锁住了,请求轮询到其他节点还是可以进行覆盖写,并没有解决到问题啊

  • SpringBoot集成redis实现分布式锁的示例代码

    1.准备 使用redis实现分布式锁,需要用的setnx(),所以需要集成Jedis 需要引入jar,jar最好和redis的jar版本对应上,不然会出现版本冲突,使用的时候会报异常redis.clients.jedis.Jedis.set(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;I)Ljava/lang/String; 我使用的redis版本是2.3.0,Jedis使用的是3.3.0 <de

  • Java基于IDEA实现http编程的示例代码

    http开发前言之为什么要有应用层 我们已经学过TCP/IP , 已经知道目前数据能从客户端进程经过路径选择跨网络传送到服务器端进程 [ IP+Port ],可是,仅仅把数据从A点传送到B点就完了吗?这就好比,在淘宝上买了一部手机,卖家[ 客户端 ]把手机通过顺丰[ 传送+路径选择 ] 送到买家 [ 服务器 ] 手里就完了吗?当然不是,买家还要使用这款产品,还要在使用之后,给卖家打分评论.所以,我们把数据从A端传送到B端, TCP/IP 解决的是顺丰的功能,而两端还要对数据进行加工处理或者使用,

  • Java基于JNDI 实现读写分离的示例代码

    目录 一.JNDI数据源配置 二.JNDI数据源使用 三.web.xml配置 四.spring-servlet.xml配置 五.spring-db.xml配置 六.log4j.properties配置 七.相关路由数据源切换逻辑代码 八.搭建过程中遇到的问题和解决方案 一.JNDI数据源配置 在Tomcat的conf目录下,context.xml在其中标签中添加如下JNDI配置: <Resource name="dataSourceMaster" factory="or

  • 用Go+Redis实现分布式锁的示例代码

    目录 为什么需要分布式锁 分布式锁需要具备特性 实现 Redis 锁应先掌握哪些知识点 set 命令 Redis.lua 脚本 go-zero 分布式锁 RedisLock 源码分析 关于分布式锁还有哪些实现方案 项目地址 为什么需要分布式锁 用户下单 锁住 uid,防止重复下单. 库存扣减 锁住库存,防止超卖. 余额扣减 锁住账户,防止并发操作. 分布式系统中共享同一个资源时往往需要分布式锁来保证变更资源一致性. 分布式锁需要具备特性 排他性 锁的基本特性,并且只能被第一个持有者持有. 防死锁

  • Java基于LoadingCache实现本地缓存的示例代码

    目录 一. 添加 maven 依赖 二.CacheBuilder 方法说明 三.创建 CacheLoader 四.工具类 五.guava Cache数据移除 一. 添加 maven 依赖 <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>27.1-jre</version> </depend

  • springboot+zookeeper实现分布式锁的示例代码

    目录 依赖 本地封装 配置 测试代码 JMeter测试 InterProcessMutex内部实现了zookeeper分布式锁的机制,所以接下来我们尝试使用这个工具来为我们的业务加上分布式锁处理的功能 zookeeper分布式锁的特点:1.分布式 2.公平锁 3.可重入 依赖 <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId>

随机推荐