浅谈Java(SpringBoot)基于zookeeper的分布式锁实现

通过zookeeper实现分布式锁

1、创建zookeeper的client

首先通过CuratorFrameworkFactory创建一个连接zookeeper的连接CuratorFramework client

public class CuratorFactoryBean implements FactoryBean<CuratorFramework>, InitializingBean, DisposableBean {
 private static final Logger LOGGER = LoggerFactory.getLogger(ContractFileInfoController.class);

 private String connectionString;
 private int sessionTimeoutMs;
 private int connectionTimeoutMs;
 private RetryPolicy retryPolicy;
 private CuratorFramework client;

 public CuratorFactoryBean(String connectionString) {
  this(connectionString, 500, 500);
 }

 public CuratorFactoryBean(String connectionString, int sessionTimeoutMs, int connectionTimeoutMs) {
  this.connectionString = connectionString;
  this.sessionTimeoutMs = sessionTimeoutMs;
  this.connectionTimeoutMs = connectionTimeoutMs;
 }

 @Override
 public void destroy() throws Exception {
  LOGGER.info("Closing curator framework...");
  this.client.close();
  LOGGER.info("Closed curator framework.");
 }

 @Override
 public CuratorFramework getObject() throws Exception {
  return this.client;
 }

 @Override
 public Class<?> getObjectType() {
   return this.client != null ? this.client.getClass() : CuratorFramework.class;
 }

 @Override
 public boolean isSingleton() {
  return true;
 }

 @Override
 public void afterPropertiesSet() throws Exception {
  if (StringUtils.isEmpty(this.connectionString)) {
   throw new IllegalStateException("connectionString can not be empty.");
  } else {
   if (this.retryPolicy == null) {
    this.retryPolicy = new ExponentialBackoffRetry(1000, 2147483647, 180000);
   }

   this.client = CuratorFrameworkFactory.newClient(this.connectionString, this.sessionTimeoutMs, this.connectionTimeoutMs, this.retryPolicy);
   this.client.start();
   this.client.blockUntilConnected(30, TimeUnit.MILLISECONDS);
  }
 }
 public void setConnectionString(String connectionString) {
  this.connectionString = connectionString;
 }

 public void setSessionTimeoutMs(int sessionTimeoutMs) {
  this.sessionTimeoutMs = sessionTimeoutMs;
 }

 public void setConnectionTimeoutMs(int connectionTimeoutMs) {
  this.connectionTimeoutMs = connectionTimeoutMs;
 }

 public void setRetryPolicy(RetryPolicy retryPolicy) {
  this.retryPolicy = retryPolicy;
 }

 public void setClient(CuratorFramework client) {
  this.client = client;
 }
}

2、封装分布式锁

根据CuratorFramework创建InterProcessMutex(分布式可重入排它锁)对一行数据进行上锁

 public InterProcessMutex(CuratorFramework client, String path) {
  this(client, path, new StandardLockInternalsDriver());
 }

使用 acquire方法
1、acquire() :入参为空,调用该方法后,会一直堵塞,直到抢夺到锁资源,或者zookeeper连接中断后,上抛异常。
2、acquire(long time, TimeUnit unit):入参传入超时时间、单位,抢夺时,如果出现堵塞,会在超过该时间后,返回false。

 public void acquire() throws Exception {
  if (!this.internalLock(-1L, (TimeUnit)null)) {
   throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);
  }
 }

 public boolean acquire(long time, TimeUnit unit) throws Exception {
  return this.internalLock(time, unit);
 }

释放锁 mutex.release();

public void release() throws Exception {
  Thread currentThread = Thread.currentThread();
  InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
  if (lockData == null) {
   throw new IllegalMonitorStateException("You do not own the lock: " + this.basePath);
  } else {
   int newLockCount = lockData.lockCount.decrementAndGet();
   if (newLockCount <= 0) {
    if (newLockCount < 0) {
     throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + this.basePath);
    } else {
     try {
      this.internals.releaseLock(lockData.lockPath);
     } finally {
      this.threadData.remove(currentThread);
     }

    }
   }
  }
 }

封装后的DLock代码
1、调用InterProcessMutex processMutex = dLock.mutex(path);

2、手动释放锁processMutex.release();

3、需要手动删除路径dLock.del(path);

推荐 使用:
都是 函数式编程
在业务代码执行完毕后 会释放锁和删除path
1、这个有返回结果
public T mutex(String path, ZkLockCallback zkLockCallback, long time, TimeUnit timeUnit)
2、这个无返回结果
public void mutex(String path, ZkVoidCallBack zkLockCallback, long time, TimeUnit timeUnit)

public class DLock {
 private final Logger logger;
 private static final long TIMEOUT_D = 100L;
 private static final String ROOT_PATH_D = "/dLock";
 private String lockRootPath;
 private CuratorFramework client;

 public DLock(CuratorFramework client) {
  this("/dLock", client);
 }

 public DLock(String lockRootPath, CuratorFramework client) {
  this.logger = LoggerFactory.getLogger(DLock.class);
  this.lockRootPath = lockRootPath;
  this.client = client;
 }
 public InterProcessMutex mutex(String path) {
  if (!StringUtils.startsWith(path, "/")) {
   path = Constant.keyBuilder(new Object[]{"/", path});
  }

  return new InterProcessMutex(this.client, Constant.keyBuilder(new Object[]{this.lockRootPath, "", path}));
 }

 public <T> T mutex(String path, ZkLockCallback<T> zkLockCallback) throws ZkLockException {
  return this.mutex(path, zkLockCallback, 100L, TimeUnit.MILLISECONDS);
 }

 public <T> T mutex(String path, ZkLockCallback<T> zkLockCallback, long time, TimeUnit timeUnit) throws ZkLockException {
  String finalPath = this.getLockPath(path);
  InterProcessMutex mutex = new InterProcessMutex(this.client, finalPath);

  try {
   if (!mutex.acquire(time, timeUnit)) {
    throw new ZkLockException("acquire zk lock return false");
   }
  } catch (Exception var13) {
   throw new ZkLockException("acquire zk lock failed.", var13);
  }

  T var8;
  try {
   var8 = zkLockCallback.doInLock();
  } finally {
   this.releaseLock(finalPath, mutex);
  }

  return var8;
 }

 private void releaseLock(String finalPath, InterProcessMutex mutex) {
  try {
   mutex.release();
   this.logger.info("delete zk node path:{}", finalPath);
   this.deleteInternal(finalPath);
  } catch (Exception var4) {
   this.logger.error("dlock", "release lock failed, path:{}", finalPath, var4);
//   LogUtil.error(this.logger, "dlock", "release lock failed, path:{}", new Object[]{finalPath, var4});
  }

 }

 public void mutex(String path, ZkVoidCallBack zkLockCallback, long time, TimeUnit timeUnit) throws ZkLockException {
  String finalPath = this.getLockPath(path);
  InterProcessMutex mutex = new InterProcessMutex(this.client, finalPath);

  try {
   if (!mutex.acquire(time, timeUnit)) {
    throw new ZkLockException("acquire zk lock return false");
   }
  } catch (Exception var13) {
   throw new ZkLockException("acquire zk lock failed.", var13);
  }

  try {
   zkLockCallback.response();
  } finally {
   this.releaseLock(finalPath, mutex);
  }

 }

 public String getLockPath(String customPath) {
  if (!StringUtils.startsWith(customPath, "/")) {
   customPath = Constant.keyBuilder(new Object[]{"/", customPath});
  }

  String finalPath = Constant.keyBuilder(new Object[]{this.lockRootPath, "", customPath});
  return finalPath;
 }

 private void deleteInternal(String finalPath) {
  try {
   ((ErrorListenerPathable)this.client.delete().inBackground()).forPath(finalPath);
  } catch (Exception var3) {
   this.logger.info("delete zk node path:{} failed", finalPath);
  }

 }

 public void del(String customPath) {
  String lockPath = "";

  try {
   lockPath = this.getLockPath(customPath);
   ((ErrorListenerPathable)this.client.delete().inBackground()).forPath(lockPath);
  } catch (Exception var4) {
   this.logger.info("delete zk node path:{} failed", lockPath);
  }

 }
}
@FunctionalInterface
public interface ZkLockCallback<T> {
 T doInLock();
}

@FunctionalInterface
public interface ZkVoidCallBack {
 void response();
}

public class ZkLockException extends Exception {
 public ZkLockException() {
 }

 public ZkLockException(String message) {
  super(message);
 }

 public ZkLockException(String message, Throwable cause) {
  super(message, cause);
 }
}

配置CuratorConfig

@Configuration
public class CuratorConfig {
 @Value("${zk.connectionString}")
 private String connectionString;

 @Value("${zk.sessionTimeoutMs:500}")
 private int sessionTimeoutMs;

 @Value("${zk.connectionTimeoutMs:500}")
 private int connectionTimeoutMs;

 @Value("${zk.dLockRoot:/dLock}")
 private String dLockRoot;

 @Bean
 public CuratorFactoryBean curatorFactoryBean() {
  return new CuratorFactoryBean(connectionString, sessionTimeoutMs, connectionTimeoutMs);
 }

 @Bean
 @Autowired
 public DLock dLock(CuratorFramework client) {
  return new DLock(dLockRoot, client);
 }
}

测试代码

@RestController
@RequestMapping("/dLock")
public class LockController {

 @Autowired
 private DLock dLock;

 @RequestMapping("/lock")
 public Map testDLock(String no){
  final String path = Constant.keyBuilder("/test/no/", no);
  Long mutex=0l;
  try {
   System.out.println("在拿锁:"+path+System.currentTimeMillis());
    mutex = dLock.mutex(path, () -> {
    try {
     System.out.println("拿到锁了" + System.currentTimeMillis());
     Thread.sleep(10000);
     System.out.println("操作完成了" + System.currentTimeMillis());
    } finally {
     return System.currentTimeMillis();
    }
   }, 1000, TimeUnit.MILLISECONDS);
  } catch (ZkLockException e) {
   System.out.println("拿不到锁呀"+System.currentTimeMillis());
  }
  return Collections.singletonMap("ret",mutex);
 }

 @RequestMapping("/dlock")
 public Map testDLock1(String no){
  final String path = Constant.keyBuilder("/test/no/", no);
  Long mutex=0l;
  try {
   System.out.println("在拿锁:"+path+System.currentTimeMillis());
   InterProcessMutex processMutex = dLock.mutex(path);
   processMutex.acquire();
   System.out.println("拿到锁了" + System.currentTimeMillis());
   Thread.sleep(10000);
   processMutex.release();
   System.out.println("操作完成了" + System.currentTimeMillis());
  } catch (ZkLockException e) {
   System.out.println("拿不到锁呀"+System.currentTimeMillis());
   e.printStackTrace();
  }catch (Exception e){
   e.printStackTrace();
  }
  return Collections.singletonMap("ret",mutex);
 }
 @RequestMapping("/del")
 public Map delDLock(String no){
  final String path = Constant.keyBuilder("/test/no/", no);
  dLock.del(path);
  return Collections.singletonMap("ret",1);
 }
}

以上所述是小编给大家介绍的Java(SpringBoot)基于zookeeper的分布式锁实现详解整合,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对我们网站的支持!

(0)

相关推荐

  • 使用dubbo+zookeeper+spring boot构建服务的方法详解

    前言 互联网的发展,网站应用的规模不断扩大,常规的垂直应用架构已无法应对,分布式服务架构以及流动计算架构势在必行,Dubbo是一个分布式服务框架,在这种情况下诞生的.现在核心业务抽取出来,作为独立的服务,使前端应用能更快速和稳定的响应. Dubbo是什么 Dubbo是Alibaba开源的分布式服务框架,它最大的特点是按照分层的方式来架构,使用这种方式可以使各个层之间解耦合(或者最大限度地松耦合).从服务模型的角度来看,Dubbo采用的是一种非常简单的模型,要么是提供方提供服务,要么是消费方消费服

  • Spring Boot 集成Dubbo框架实例

    使用Spring Boot 与Dubbo集成,这里我之前尝试了使用注解的方式,简单的使用注解注册服务其实是没有问题的,但是当你涉及到使用注解的时候在服务里面引用事务,注入其他对象的时候,会有一些问题.于是我就果断放弃了注解了,使用的是XML,这里可能介绍的是Dubbo,但是如果使用Dubbox的话,基本上是兼容的.接下来,将说说使用XML的方式与Spring Boot在一起开发. 1.创建工程在pom.xml中加入依赖 创建工程名为: (1)springboot-dubbo-provide (2

  • SpringBoot+Dubbo+Seata分布式事务实战详解

    前言 Seata 是 阿里巴巴开源的分布式事务中间件,以高效并且对业务0侵入的方式,解决微服务场景下面临的分布式事务问题. 事实上,官方在GitHub已经给出了多种环境下的Seata应用示例项目,地址:https://github.com/seata/seata-samples. 为什么笔者要重新写一遍呢,主要原因有两点: 官网代码示例中,依赖太多,分不清哪些有什么作用 Seata相关资料较少,笔者在搭建的过程中,遇到了一些坑,记录一下 一.环境准备 本文涉及软件环境如下: SpringBoot

  • springboot2.0整合dubbo的示例代码

    写在前面: 使用springboot作为web框架,方便开发许多,做分布式开发,dubbo又不可少,那么怎么整合在一起呢, 跟我学一遍,至少会用 注意,springboot2.0和springboot1.x与dubbo整合不一样, 1.环境 1.新建一个空的maven项目,作为父工程,新建moudle,,service(接口层,及实现层,没有具体分,),web(web层,springboot项目) 项目结构如下 父pom如下 <properties> <project.build.sou

  • Dubbo在Spring和Spring Boot中的使用详解

    一.在Spring中使用Dubbo 1.Maven依赖 <dependency> <groupId>com.alibaba</groupId> <artifactId>dubbo</artifactId> <version>2.5.3.6</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artif

  • springboot使用dubbo和zookeeper代码实例

    这篇文章主要介绍了springboot使用dubbo和zookeeper代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 创建服务接口模块 接口工程只提供接口,不提供实现,在后面的提供者和消费者中使用 在使用接口的模块中只需要写具体实现类,避免了在每个模块中重复编写接口 在接口中引入依赖包 <dependency> <groupId>org.projectlombok</groupId> <artifact

  • Springboot 整合 Dubbo/ZooKeeper 实现 SOA 案例解析

    一.为啥整合 Dubbo 实现 SOA Dubbo 不单单只是高性能的 RPC 调用框架,更是 SOA 服务治理的一种方案. 核心: 远程通信,向本地调用一样调用远程方法. 集群容错 服务自动发现和注册,可平滑添加或者删除服务提供者. 我们常常使用 Springboot 暴露 HTTP 服务,并走 JSON 模式.但慢慢量大了,一种 SOA 的治理方案.这样可以暴露出 Dubbo 服务接口,提供给 Dubbo 消费者进行 RPC 调用.下面我们详解下如何集成 Dubbo. 二.运行 spring

  • 浅谈Java(SpringBoot)基于zookeeper的分布式锁实现

    通过zookeeper实现分布式锁 1.创建zookeeper的client 首先通过CuratorFrameworkFactory创建一个连接zookeeper的连接CuratorFramework client public class CuratorFactoryBean implements FactoryBean<CuratorFramework>, InitializingBean, DisposableBean { private static final Logger LOGG

  • 浅谈Java springboot日志管理

    一.前言 springboot默认使用Logback组件作为日志管理.Logback是由log4j创始人设计的一个开源日志组件. 在springboot项目中我们不需要额外的添加Logback的依赖,因为在spring-boot-starter或者spring-boot-starter-web中已经包含了Logback的依赖 Logback读取配置文件的步骤 在classpath下查找文件logback-test.xml 如果文件不存在,则查找logback.xml 如果上面两个文件都不存在,L

  • 基于Zookeeper实现分布式锁详解

    目录 1.什么是Zookeeper? 2.Zookeeper节点类型 3.Zookeeper环境搭建 4.Zookeeper基本使用 5.Zookeeper应用场景 6.Zookeeper分布式锁 7.公平式Zookeeper分布式锁 8.zookeeper和Redis锁对比? 1.什么是Zookeeper? Zookeeper是一个分布式的,开源的分布式应用程序协调服务,是Hadoop和hbase的重要组件. 引用官网的图例: 特征: zookeeper的数据机构是一种节点树的数据结构,zNo

  • SpringBoot基于Redis的分布式锁实现过程记录

    目录 一.概述 二.环境搭建 三.模拟一个库存扣减的场景 四.总结 一.概述 什么是分布式锁 在单机环境中,一般在多并发多线程场景下,出现多个线程去抢占一个资源,这个时候会出现线程同步问题,造成执行的结果没有达到预期.我们会用线程间加锁的方式,比如synchronized,lock,volatile,以及JVM并发包中提供的其他工具类去处理此问题. 但是随着技术的发展,分布式系统的出现,各个应用服务都部署在不同节点,由各自的JVM去操控,资源已经不是在 线程 之间的共享,而是变成了 进程 之间的

  • 详解Java如何实现基于Redis的分布式锁

    前言 单JVM内同步好办, 直接用JDK提供的锁就可以了,但是跨进程同步靠这个肯定是不可能的,这种情况下肯定要借助第三方,我这里实现用Redis,当然还有很多其他的实现方式.其实基于Redis实现的原理还算比较简单的,在看代码之前建议大家先去看看原理,看懂了之后看代码应该就容易理解了. 我这里不实现JDK的java.util.concurrent.locks.Lock接口,而是自定义一个,因为JDK的有个newCondition方法我这里暂时没实现.这个Lock提供了5个lock方法的变体,可以

  • springboot整合curator实现分布式锁过程

    目录 springboot curator实现分布式锁 理论篇: 实操篇: 项目实际应用中分布式锁介绍 锁的介绍 悲观锁-数据库锁 悲观锁-缓存锁 分布式锁—zookeeper springboot curator实现分布式锁 理论篇: Curator是Netflix开源的一套ZooKeeper客户端框架. Netflix在使用ZooKeeper的过程中发现ZooKeeper自带的客户端太底层, 应用方在使用的时候需要自己处理很多事情, 于是在它的基础上包装了一下, 提供了一套更好用的客户端框架

  • zookeeper实现分布式锁

    一.分布式锁介绍 分布式锁主要用于在分布式环境中保护跨进程.跨主机.跨网络的共享资源实现互斥访问,以达到保证数据的一致性. 二.架构介绍 在介绍使用Zookeeper实现分布式锁之前,首先看当前的系统架构图 解释: 左边的整个区域表示一个Zookeeper集群,locker是Zookeeper的一个持久节点,node_1.node_2.node_3是locker这个持久节点下面的临时顺序节点.client_1.client_2.client_n表示多个客户端,Service表示需要互斥访问的共享

  • 浅谈Java实现分布式事务的三种方案

    一.问题描述 用户支付完成会将支付状态及订单状态保存在订单数据库中,由订单服务去维护订单数据库.由库存服务去维护库存数据库的信息.下图是系统结构图: 如何实现两个分布式服务(订单服务.库存服务)共同完成一件事即订单支付成功自动减库存,这里的关键是如何保证两个分布式服务的事务的一致性. 尝试解决上边的需求,在订单服务中远程调用减库存接口,伪代码如下: 订单支付结果通知方法{ ​ 更新支付表中支付状态为"成功". ​ 远程调用减库存接口减库存. } 上边的逻辑说明: 1.更新支付表状态为本

  • 浅谈Java分布式架构下如何实现分布式锁

    01分布式锁运用场景 互联网秒杀,抢优惠卷,接口幂等性校验.咱们以互联网秒杀为例. @RestController @Slf4j publicclassIndexController{ @Autowired privateRedissonredission; @Autowired privateStringRedisTemplatestringRedisTemplate; @RequestMapping("/deduct_stock") publicStringdeductStock(

  • 浅谈Java如何实现一个基于LRU时间复杂度为O(1)的缓存

    LRU:Least Recently Used最近最少使用,当缓存容量不足时,先淘汰最近最少使用的数据.就像JVM垃圾回收一样,希望将存活的对象移动到内存的一端,然后清除其余空间. 缓存基本操作就是读.写.淘汰删除. 读操作时间复杂度为O(1)的那就是hash操作了,可以使用HashMap索引 key. 写操作时间复杂度为O(1),使用链表结构,在链表的一端插入节点,是可以完成O(1)操作,但是为了配合读,还要再次将节点放入HashMap中,put操作最优是O(1),最差是O(n). 不少童鞋就

随机推荐