java简单手写版本实现时间轮算法

时间轮

关于时间轮的介绍,网上有很多,这里就不重复了

核心思想

  • 一个环形数组存储时间轮的所有槽(看你的手表),每个槽对应当前时间轮的最小精度
  • 超过当前时间轮最大表示范围的会被丢到上层时间轮,上层时间轮的最小精度即为下层时间轮能表达的最大时间(时分秒概念)
  • 每个槽对应一个环形链表存储该时间应该被执行的任务
  • 需要一个线程去驱动指针运转,获取到期任务

以下给出java 简单手写版本实现

代码实现

时间轮主数据结构

/**
 * @author apdoer
 * @version 1.0
 * @date 2021/3/22 19:31
 */
@Slf4j
public class TimeWheel {
 /**
  * 一个槽的时间间隔(时间轮最小刻度)
  */
 private long tickMs;

 /**
  * 时间轮大小(槽的个数)
  */
 private int wheelSize;

 /**
  * 一轮的时间跨度
  */
 private long interval;

 private long currentTime;

 /**
  * 槽
  */
 private TimerTaskList[] buckets;

 /**
  * 上层时间轮
  */
 private volatile TimeWheel overflowWheel;

 /**
  * 一个timer只有一个delayqueue
  */
 private DelayQueue<TimerTaskList> delayQueue;

 public TimeWheel(long tickMs, int wheelSize, long currentTime, DelayQueue<TimerTaskList> delayQueue) {
  this.currentTime = currentTime;
  this.tickMs = tickMs;
  this.wheelSize = wheelSize;
  this.interval = tickMs * wheelSize;
  this.buckets = new TimerTaskList[wheelSize];
  this.currentTime = currentTime - (currentTime % tickMs);
  this.delayQueue = delayQueue;
  for (int i = 0; i < wheelSize; i++) {
   buckets[i] = new TimerTaskList();
  }
 }

 public boolean add(TimerTaskEntry entry) {
  long expiration = entry.getExpireMs();
  if (expiration < tickMs + currentTime) {
   //到期了
   return false;
  } else if (expiration < currentTime + interval) {
   //扔进当前时间轮的某个槽里,只有时间大于某个槽,才会放进去
   long virtualId = (expiration / tickMs);
   int index = (int) (virtualId % wheelSize);
   TimerTaskList bucket = buckets[index];
   bucket.addTask(entry);
   //设置bucket 过期时间
   if (bucket.setExpiration(virtualId * tickMs)) {
    //设好过期时间的bucket需要入队
    delayQueue.offer(bucket);
    return true;
   }
  } else {
   //当前轮不能满足,需要扔到上一轮
   TimeWheel timeWheel = getOverflowWheel();
   return timeWheel.add(entry);
  }
  return false;
 }

 private TimeWheel getOverflowWheel() {
  if (overflowWheel == null) {
   synchronized (this) {
    if (overflowWheel == null) {
     overflowWheel = new TimeWheel(interval, wheelSize, currentTime, delayQueue);
    }
   }
  }
  return overflowWheel;
 }

 /**
  * 推进指针
  *
  * @param timestamp
  */
 public void advanceLock(long timestamp) {
  if (timestamp > currentTime + tickMs) {
   currentTime = timestamp - (timestamp % tickMs);
   if (overflowWheel != null) {
    this.getOverflowWheel().advanceLock(timestamp);
   }
  }
 }
}

定时器接口

/**
 * 定时器
 * @author apdoer
 * @version 1.0
 * @date 2021/3/22 20:30
 */
public interface Timer {

 /**
  * 添加一个新任务
  *
  * @param timerTask
  */
 void add(TimerTask timerTask);

 /**
  * 推动指针
  *
  * @param timeout
  */
 void advanceClock(long timeout);

 /**
  * 等待执行的任务
  *
  * @return
  */
 int size();

 /**
  * 关闭服务,剩下的无法被执行
  */
 void shutdown();
}

定时器实现

/**
 * @author apdoer
 * @version 1.0
 * @date 2021/3/22 20:33
 */
@Slf4j
public class SystemTimer implements Timer {
 /**
  * 底层时间轮
  */
 private TimeWheel timeWheel;
 /**
  * 一个Timer只有一个延时队列
  */
 private DelayQueue<TimerTaskList> delayQueue = new DelayQueue<>();
 /**
  * 过期任务执行线程
  */
 private ExecutorService workerThreadPool;
 /**
  * 轮询delayQueue获取过期任务线程
  */
 private ExecutorService bossThreadPool;

 public SystemTimer() {
  this.timeWheel = new TimeWheel(1, 20, System.currentTimeMillis(), delayQueue);
  this.workerThreadPool = Executors.newFixedThreadPool(100);
  this.bossThreadPool = Executors.newFixedThreadPool(1);
  //20ms推动一次时间轮运转
  this.bossThreadPool.submit(() -> {
   for (; ; ) {
    this.advanceClock(20);
   }
  });
 }

 public void addTimerTaskEntry(TimerTaskEntry entry) {
  if (!timeWheel.add(entry)) {
   //已经过期了
   TimerTask timerTask = entry.getTimerTask();
   log.info("=====任务:{} 已到期,准备执行============",timerTask.getDesc());
   workerThreadPool.submit(timerTask);
  }
 }

 @Override
 public void add(TimerTask timerTask) {
  log.info("=======添加任务开始====task:{}", timerTask.getDesc());
  TimerTaskEntry entry = new TimerTaskEntry(timerTask, timerTask.getDelayMs() + System.currentTimeMillis());
  timerTask.setTimerTaskEntry(entry);
  addTimerTaskEntry(entry);
 }

 /**
  * 推动指针运转获取过期任务
  *
  * @param timeout 时间间隔
  * @return
  */
 @Override
 public synchronized void advanceClock(long timeout) {
  try {
   TimerTaskList bucket = delayQueue.poll(timeout, TimeUnit.MILLISECONDS);
   if (bucket != null) {
    //推进时间
    timeWheel.advanceLock(bucket.getExpiration());
    //执行过期任务(包含降级)
    bucket.clear(this::addTimerTaskEntry);
   }
  } catch (InterruptedException e) {
   log.error("advanceClock error");
  }
 }

 @Override
 public int size() {
  //todo
  return 0;
 }

 @Override
 public void shutdown() {
  this.bossThreadPool.shutdown();
  this.workerThreadPool.shutdown();
  this.timeWheel = null;
 }
}

存储任务的环形链表

/**
 * @author apdoer
 * @version 1.0
 * @date 2021/3/22 19:26
 */
@Data
@Slf4j
class TimerTaskList implements Delayed {
 /**
  * TimerTaskList 环形链表使用一个虚拟根节点root
  */
 private TimerTaskEntry root = new TimerTaskEntry(null, -1);

 {
  root.next = root;
  root.prev = root;
 }

 /**
  * bucket的过期时间
  */
 private AtomicLong expiration = new AtomicLong(-1L);

 public long getExpiration() {
  return expiration.get();
 }

 /**
  * 设置bucket的过期时间,设置成功返回true
  *
  * @param expirationMs
  * @return
  */
 boolean setExpiration(long expirationMs) {
  return expiration.getAndSet(expirationMs) != expirationMs;
 }

 public boolean addTask(TimerTaskEntry entry) {
  boolean done = false;
  while (!done) {
   //如果TimerTaskEntry已经在别的list中就先移除,同步代码块外面移除,避免死锁,一直到成功为止
   entry.remove();
   synchronized (this) {
    if (entry.timedTaskList == null) {
     //加到链表的末尾
     entry.timedTaskList = this;
     TimerTaskEntry tail = root.prev;
     entry.prev = tail;
     entry.next = root;
     tail.next = entry;
     root.prev = entry;
     done = true;
    }
   }
  }
  return true;
 }

 /**
  * 从 TimedTaskList 移除指定的 timerTaskEntry
  *
  * @param entry
  */
 public void remove(TimerTaskEntry entry) {
  synchronized (this) {
   if (entry.getTimedTaskList().equals(this)) {
    entry.next.prev = entry.prev;
    entry.prev.next = entry.next;
    entry.next = null;
    entry.prev = null;
    entry.timedTaskList = null;
   }
  }
 }

 /**
  * 移除所有
  */
 public synchronized void clear(Consumer<TimerTaskEntry> entry) {
  TimerTaskEntry head = root.next;
  while (!head.equals(root)) {
   remove(head);
   entry.accept(head);
   head = root.next;
  }
  expiration.set(-1L);
 }

 @Override
 public long getDelay(TimeUnit unit) {
  return Math.max(0, unit.convert(expiration.get() - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
 }

 @Override
 public int compareTo(Delayed o) {
  if (o instanceof TimerTaskList) {
   return Long.compare(expiration.get(), ((TimerTaskList) o).expiration.get());
  }
  return 0;
 }
}

存储任务的容器entry

/**
 * @author apdoer
 * @version 1.0
 * @date 2021/3/22 19:26
 */
@Data
class TimerTaskEntry implements Comparable<TimerTaskEntry> {
 private TimerTask timerTask;
 private long expireMs;
 volatile TimerTaskList timedTaskList;
 TimerTaskEntry next;
 TimerTaskEntry prev;

 public TimerTaskEntry(TimerTask timedTask, long expireMs) {
  this.timerTask = timedTask;
  this.expireMs = expireMs;
  this.next = null;
  this.prev = null;
 }

 void remove() {
  TimerTaskList currentList = timedTaskList;
  while (currentList != null) {
   currentList.remove(this);
   currentList = timedTaskList;
  }
 }

 @Override
 public int compareTo(TimerTaskEntry o) {
  return ((int) (this.expireMs - o.expireMs));
 }
}

任务包装类(这里也可以将工作任务以线程变量的方式去传入)

@Data
@Slf4j
class TimerTask implements Runnable {
 /**
  * 延时时间
  */
 private long delayMs;
 /**
  * 任务所在的entry
  */
 private TimerTaskEntry timerTaskEntry;

 private String desc;

 public TimerTask(String desc, long delayMs) {
  this.desc = desc;
  this.delayMs = delayMs;
  this.timerTaskEntry = null;
 }

 public synchronized void setTimerTaskEntry(TimerTaskEntry entry) {
  // 如果这个timetask已经被一个已存在的TimerTaskEntry持有,先移除一个
  if (timerTaskEntry != null && timerTaskEntry != entry) {
   timerTaskEntry.remove();
  }
  timerTaskEntry = entry;
 }

 public TimerTaskEntry getTimerTaskEntry() {
  return timerTaskEntry;
 }

 @Override
 public void run() {
  log.info("============={}任务执行", desc);
 }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Java时间轮算法的实现代码示例

    考虑这样一个场景,现在有5000个任务,要让这5000个任务每隔5分中触发某个操作,怎么去实现这个需求.大部分人首先想到的是使用定时器,但是5000个任务,你就要用5000个定时器,一个定时器就是一个线程,你懂了吧,这种方法肯定是不行的. 针对这个场景,催生了时间轮算法,时间轮到底是什么?我一贯的风格,自行谷歌去.大发慈悲,发个时间轮介绍你们看看,看文字和图就好了,代码不要看了,那个文章里的代码运行不起来,时间轮介绍. 看好了介绍,我们就开始动手吧. 开发环境:idea + jdk1.8 + m

  • java简单手写版本实现时间轮算法

    时间轮 关于时间轮的介绍,网上有很多,这里就不重复了 核心思想 一个环形数组存储时间轮的所有槽(看你的手表),每个槽对应当前时间轮的最小精度 超过当前时间轮最大表示范围的会被丢到上层时间轮,上层时间轮的最小精度即为下层时间轮能表达的最大时间(时分秒概念) 每个槽对应一个环形链表存储该时间应该被执行的任务 需要一个线程去驱动指针运转,获取到期任务 以下给出java 简单手写版本实现 代码实现 时间轮主数据结构 /** * @author apdoer * @version 1.0 * @date

  • java实现手写一个简单版的线程池

    有些人可能对线程池比较陌生,并且更不熟悉线程池的工作原理.所以他们在使用线程的时候,多数情况下都是new Thread来实现多线程.但是,往往良好的多线程设计大多都是使用线程池来实现的. 为什么要使用线程 降低资源的消耗.降低线程创建和销毁的资源消耗.提高响应速度:线程的创建时间为T1,执行时间T2,销毁时间T3,免去T1和T3的时间提高线程的可管理性 下图所示为线程池的实现原理:调用方不断向线程池中提交任务:线程池中有一组线程,不断地从队列中取任务,这是一个典型的生产者-消费者模型. 要实现一

  • Java实现手写一个线程池的示例代码

    目录 概述 线程池框架设计 代码实现 阻塞队列的实现 线程池消费端实现 获取任务超时设计 拒绝策略设计 概述 线程池技术想必大家都不陌生把,相信在平时的工作中没有少用,而且这也是面试频率非常高的一个知识点,那么大家知道它的实现原理和细节吗?如果直接去看jdk源码的话,可能有一定的难度,那么我们可以先通过手写一个简单的线程池框架,去掌握线程池的基本原理后,再去看jdk的线程池源码就会相对容易,而且不容易忘记. 线程池框架设计 我们都知道,线程资源的创建和销毁并不是没有代价的,甚至开销是非常高的.同

  • PyTorch简单手写数字识别的实现过程

    目录 一.包导入及所需数据的下载 关于数据集引入的改动 二.进行数据处理变换操作 三.数据预览测试和数据装载 四.模型搭建和参数优化 关于模型搭建的改动 总代码: 测试 总结 具体流程: ① 导入相应的包,下载训练集和测试集对应需要的图像数据. ②进行图像数据的变换,使图像数据转化成pytorch可识别并计算的张量数据类型 ③数据预览测试和数据装载 ④模型搭建和参数优化 ⑤总代码 ⑥测试 一.包导入及所需数据的下载 torchvision包的主要功能是实现数据的处理.导入.预览等,所以如果需要对

  • Java 通过手写分布式雪花SnowFlake生成ID方法详解

    目录 SnowFlake算法 SnowFlake优点: SnowFlake算法 SnowFlake算法生成id的结果是一个64bit大小的整数,它的结构如下图: 分为四段: 第一段: 1位为未使用,永远固定为0. (因为二进制中最高位是符号位,1表示负数,0表示正数.生成的id一般都是用正整数,所以最高位固定为0 ) 第二段: 41位为毫秒级时间(41位的长度可以使用69年) 第三段: 10位为workerId(10位的长度最多支持部署1024个节点) (这里的10位又分为两部分,第一部分5位表

  • Java 通过手写分布式雪花SnowFlake生成ID方法详解

    目录 SnowFlake算法 SnowFlake优点 SnowFlake不足 SnowFlake算法 SnowFlake算法生成id的结果是一个64bit大小的整数,它的结构如下图: 分为四段: 第一段: 1位为未使用,永远固定为0. (因为二进制中最高位是符号位,1表示负数,0表示正数.生成的id一般都是用正整数,所以最高位固定为0 ) 第二段: 41位为毫秒级时间(41位的长度可以使用69年) 第三段: 10位为workerId(10位的长度最多支持部署1024个节点) (这里的10位又分为

  • Java实现手写自旋锁的示例代码

    目录 前言 自旋锁 原子性 自己动手写自旋锁 自己动手写可重入自旋锁 总结 前言 我们在写并发程序的时候,一个非常常见的需求就是保证在某一个时刻只有一个线程执行某段代码,像这种代码叫做临界区,而通常保证一个时刻只有一个线程执行临界区的代码的方法就是锁.在本篇文章当中我们将会仔细分析和学习自旋锁,所谓自旋锁就是通过while循环实现的,让拿到锁的线程进入临界区执行代码,让没有拿到锁的线程一直进行while死循环,这其实就是线程自己“旋”在while循环了,因而这种锁就叫做自旋锁. 自旋锁 原子性

  • Java实现手写线程池的示例代码

    目录 前言 线程池给我们提供的功能 工具介绍 Worker设计 线程池设计 总结 前言 在我们的日常的编程当中,并发是始终离不开的主题,而在并发多线程当中,线程池又是一个不可规避的问题.多线程可以提高我们并发程序的效率,可以让我们不去频繁的申请和释放线程,这是一个很大的花销,而在线程池当中就不需要去频繁的申请线程,他的主要原理是申请完线程之后并不中断,而是不断的去队列当中领取任务,然后执行,反复这样的操作.在本篇文章当中我们主要是介绍线程池的原理,因此我们会自己写一个非常非常简单的线程池,主要帮

  • Java实现手写线程池实例并测试详解

    前言 在之前的文章中介绍过线程池的核心原理,在一次面试中面试官让手写线程池,这块知识忘记的差不多了,因此本篇文章做一个回顾. 希望能够加深自己的印象以及帮助到其他的小伙伴儿们 在线程池核心原理篇介绍过线程池的核心原理,今天来模拟线程池和工作队列的流程,以及编写代码和测试类进行测试.下面附下之前线程池的核心流程: 在线程池核心原理的源码中,涉及到了一系列的流程,包括线程池队列数量是否已满,运用什么样的拒绝策略等.在我们手写线程池的代码中,不需要考虑那么多因素,只需要模拟简单的情景和过程,因此整体来

  • 微信小程序实现简单手写签名组件的方法实例

    目录 背景: 需求: 效果 一.思路 二.实现 1. 页面与样式 2. 初始化 3. 点击时 4. 签名时 三.总结 背景: 在做项目过程中,需要在微信小程序中实现手写签名组件.在网上找了微信小程序手写签名实现,但是都是不太理想.在实际运用中,会因为实时计算较多的贝塞尔曲线而产生卡顿.效果不理想.所以退一步,不需要笔锋以及笔迹模拟效果.只需要简单的手写签名实现. 需求: 可以实现用户在微信小程序上手写签名. 需要组件化. 效果 一.思路 在微信小程序中,我们使用canvas组件实现.将用户的输入

随机推荐