使用Redis实现延时任务的解决方案

最近在生产环境刚好遇到了延时任务的场景,调研了一下目前主流的方案,分析了一下优劣并且敲定了最终的方案。这篇文章记录了调研的过程,以及初步方案的实现。

候选方案对比

下面是想到的几种实现延时任务的方案,总结了一下相应的优势和劣势。

方案 优势 劣势 选用场景
JDK 内置的延迟队列 DelayQueue 实现简单 数据内存态,不可靠 一致性相对低的场景
调度框架和 MySQL 进行短间隔轮询 实现简单,可靠性高 存在明显的性能瓶颈 数据量较少实时性相对低的场景
RabbitMQ 的 DLX 和 TTL,一般称为 死信队列 方案 异步交互可以削峰 延时的时间长度不可控,如果数据需要持久化则性能会降低 -
调度框架和 Redis 进行短间隔轮询 数据持久化,高性能 实现难度大 常见于支付结果回调方案
时间轮 实时性高 实现难度大,内存消耗大 实时性高的场景

如果应用的数据量不高,实时性要求比较低,选用调度框架和 MySQL 进行短间隔轮询这个方案是最优的方案。但是笔者遇到的场景数据量相对比较大,实时性并不高,采用扫库的方案一定会对 MySQL 实例造成比较大的压力。记得很早之前,看过一个PPT叫《盒子科技聚合支付系统演进》,其中里面有一张图片给予笔者一点启发:

里面刚好用到了调度框架和 Redis 进行短间隔轮询实现延时任务的方案,不过为了分摊应用的压力,图中的方案还做了分片处理。鉴于笔者当前业务紧迫,所以在第一期的方案暂时不考虑分片,只做了一个简化版的实现。

由于PPT中没有任何的代码或者框架贴出,有些需要解决的技术点需要自行思考,下面会重现一次整个方案实现的详细过程。

场景设计

实际的生产场景是笔者负责的某个系统需要对接一个外部的资金方,每一笔资金下单后需要延时30分钟推送对应的附件。这里简化为一个订单信息数据延迟处理的场景,就是每一笔下单记录一条订单消息(暂时叫做 OrderMessage ),订单消息需要延迟5到15秒后进行异步处理。

否决的候选方案实现思路

下面介绍一下其它四个不选用的候选方案,结合一些伪代码和流程分析一下实现过程。

JDK内置延迟队列

DelayQueue 是一个阻塞队列的实现,它的队列元素必须是 Delayed 的子类,这里做个简单的例子:

public class DelayQueueMain {
  private static final Logger LOGGER = LoggerFactory.getLogger(DelayQueueMain.class);
  public static void main(String[] args) throws Exception {
    DelayQueue<OrderMessage> queue = new DelayQueue<>();
    // 默认延迟5秒
    OrderMessage message = new OrderMessage("ORDER_ID_10086");
    queue.add(message);
    // 延迟6秒
    message = new OrderMessage("ORDER_ID_10087", 6);
    queue.add(message);
    // 延迟10秒
    message = new OrderMessage("ORDER_ID_10088", 10);
    queue.add(message);
    ExecutorService executorService = Executors.newSingleThreadExecutor(r -> {
      Thread thread = new Thread(r);
      thread.setName("DelayWorker");
      thread.setDaemon(true);
      return thread;
    });
    LOGGER.info("开始执行调度线程...");
    executorService.execute(() -> {
      while (true) {
        try {
          OrderMessage task = queue.take();
          LOGGER.info("延迟处理订单消息,{}", task.getDescription());
        } catch (Exception e) {
          LOGGER.error(e.getMessage(), e);
        }
      }
    });
    Thread.sleep(Integer.MAX_VALUE);
  }
  private static class OrderMessage implements Delayed {
    private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    /**
     * 默认延迟5000毫秒
     */
    private static final long DELAY_MS = 1000L * 5;
    /**
     * 订单ID
     */
    private final String orderId;
    /**
     * 创建时间戳
     */
    private final long timestamp;
    /**
     * 过期时间
     */
    private final long expire;
    /**
     * 描述
     */
    private final String description;
    public OrderMessage(String orderId, long expireSeconds) {
      this.orderId = orderId;
      this.timestamp = System.currentTimeMillis();
      this.expire = this.timestamp + expireSeconds * 1000L;
      this.description = String.format("订单[%s]-创建时间为:%s,超时时间为:%s", orderId,
          LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F),
          LocalDateTime.ofInstant(Instant.ofEpochMilli(expire), ZoneId.systemDefault()).format(F));
    }
    public OrderMessage(String orderId) {
      this.orderId = orderId;
      this.timestamp = System.currentTimeMillis();
      this.expire = this.timestamp + DELAY_MS;
      this.description = String.format("订单[%s]-创建时间为:%s,超时时间为:%s", orderId,
          LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F),
          LocalDateTime.ofInstant(Instant.ofEpochMilli(expire), ZoneId.systemDefault()).format(F));
    }
    public String getOrderId() {
      return orderId;
    }
    public long getTimestamp() {
      return timestamp;
    }
    public long getExpire() {
      return expire;
    }
    public String getDescription() {
      return description;
    }
    @Override
    public long getDelay(TimeUnit unit) {
      return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }
    @Override
    public int compareTo(Delayed o) {
      return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }
  }
}

注意一下, OrderMessage 实现 Delayed 接口,关键是需要实现 Delayed#getDelay()Delayed#compareTo() 。运行一下 main() 方法:

10:16:08.240 [main] INFO club.throwable.delay.DelayQueueMain - 开始执行调度线程...
10:16:13.224 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延迟处理订单消息,订单[ORDER_ID_10086]-创建时间为:2019-08-20 10:16:08,超时时间为:2019-08-20 10:16:13
10:16:14.237 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延迟处理订单消息,订单[ORDER_ID_10087]-创建时间为:2019-08-20 10:16:08,超时时间为:2019-08-20 10:16:14
10:16:18.237 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延迟处理订单消息,订单[ORDER_ID_10088]-创建时间为:2019-08-20 10:16:08,超时时间为:2019-08-20 10:16:18

调度框架 + MySQL

使用调度框架对 MySQL 表进行短间隔轮询是实现难度比较低的方案,通常服务刚上线,表数据不多并且实时性不高的情况下应该首选这个方案。不过要注意以下几点:

MySQL

引入 QuartzMySQL 的Java驱动包和 spring-boot-starter-jdbc (这里只是为了方便用相对轻量级的框架实现,生产中可以按场景按需选择其他更合理的框架):

<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>5.1.48</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-jdbc</artifactId>
  <version>2.1.7.RELEASE</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.quartz-scheduler</groupId>
  <artifactId>quartz</artifactId>
  <version>2.3.1</version>
  <scope>test</scope>
</dependency>

假设表设计如下:

CREATE DATABASE `delayTask` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci;

USE `delayTask`;

CREATE TABLE `t_order_message`
(
  id      BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT,
  order_id   VARCHAR(50) NOT NULL COMMENT '订单ID',
  create_time DATETIME  NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建日期时间',
  edit_time  DATETIME  NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改日期时间',
  retry_times TINYINT   NOT NULL DEFAULT 0 COMMENT '重试次数',
  order_status TINYINT   NOT NULL DEFAULT 0 COMMENT '订单状态',
  INDEX idx_order_id (order_id),
  INDEX idx_create_time (create_time)
) COMMENT '订单信息表';

# 写入两条测试数据
INSERT INTO t_order_message(order_id) VALUES ('10086'),('10087');

编写代码:

// 常量
public class OrderConstants {

  public static final int MAX_RETRY_TIMES = 5;

  public static final int PENDING = 0;

  public static final int SUCCESS = 1;

  public static final int FAIL = -1;

  public static final int LIMIT = 10;
}

// 实体
@Builder
@Data
public class OrderMessage {

  private Long id;
  private String orderId;
  private LocalDateTime createTime;
  private LocalDateTime editTime;
  private Integer retryTimes;
  private Integer orderStatus;
}

// DAO
@RequiredArgsConstructor
public class OrderMessageDao {

  private final JdbcTemplate jdbcTemplate;

  private static final ResultSetExtractor<List<OrderMessage>> M = r -> {
    List<OrderMessage> list = Lists.newArrayList();
    while (r.next()) {
      list.add(OrderMessage.builder()
          .id(r.getLong("id"))
          .orderId(r.getString("order_id"))
          .createTime(r.getTimestamp("create_time").toLocalDateTime())
          .editTime(r.getTimestamp("edit_time").toLocalDateTime())
          .retryTimes(r.getInt("retry_times"))
          .orderStatus(r.getInt("order_status"))
          .build());
    }
    return list;
  };

  public List<OrderMessage> selectPendingRecords(LocalDateTime start,
                          LocalDateTime end,
                          List<Integer> statusList,
                          int maxRetryTimes,
                          int limit) {
    StringJoiner joiner = new StringJoiner(",");
    statusList.forEach(s -> joiner.add(String.valueOf(s)));
    return jdbcTemplate.query("SELECT * FROM t_order_message WHERE create_time >= ? AND create_time <= ? " +
            "AND order_status IN (?) AND retry_times < ? LIMIT ?",
        p -> {
          p.setTimestamp(1, Timestamp.valueOf(start));
          p.setTimestamp(2, Timestamp.valueOf(end));
          p.setString(3, joiner.toString());
          p.setInt(4, maxRetryTimes);
          p.setInt(5, limit);
        }, M);
  }

  public int updateOrderStatus(Long id, int status) {
    return jdbcTemplate.update("UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?",
        p -> {
          p.setInt(1, status);
          p.setTimestamp(2, Timestamp.valueOf(LocalDateTime.now()));
          p.setLong(3, id);
        });
  }
}

// Service
@RequiredArgsConstructor
public class OrderMessageService {

  private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageService.class);

  private final OrderMessageDao orderMessageDao;

  private static final List<Integer> STATUS = Lists.newArrayList();

  static {
    STATUS.add(OrderConstants.PENDING);
    STATUS.add(OrderConstants.FAIL);
  }

  public void executeDelayJob() {
    LOGGER.info("订单处理定时任务开始执行......");
    LocalDateTime end = LocalDateTime.now();
    // 一天前
    LocalDateTime start = end.minusDays(1);
    List<OrderMessage> list = orderMessageDao.selectPendingRecords(start, end, STATUS, OrderConstants.MAX_RETRY_TIMES, OrderConstants.LIMIT);
    if (!list.isEmpty()) {
      for (OrderMessage m : list) {
        LOGGER.info("处理订单[{}],状态由{}更新为{}", m.getOrderId(), m.getOrderStatus(), OrderConstants.SUCCESS);
        // 这里其实可以优化为批量更新
        orderMessageDao.updateOrderStatus(m.getId(), OrderConstants.SUCCESS);
      }
    }
    LOGGER.info("订单处理定时任务开始完毕......");
  }
}

// Job
@DisallowConcurrentExecution
public class OrderMessageDelayJob implements Job {

  @Override
  public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
    OrderMessageService service = (OrderMessageService) jobExecutionContext.getMergedJobDataMap().get("orderMessageService");
    service.executeDelayJob();
  }

  public static void main(String[] args) throws Exception {
    HikariConfig config = new HikariConfig();
    config.setJdbcUrl("jdbc:mysql://localhost:3306/delayTask?useSSL=false&characterEncoding=utf8");
    config.setDriverClassName(Driver.class.getName());
    config.setUsername("root");
    config.setPassword("root");
    HikariDataSource dataSource = new HikariDataSource(config);
    OrderMessageDao orderMessageDao = new OrderMessageDao(new JdbcTemplate(dataSource));
    OrderMessageService service = new OrderMessageService(orderMessageDao);
    // 内存模式的调度器
    StdSchedulerFactory factory = new StdSchedulerFactory();
    Scheduler scheduler = factory.getScheduler();
    // 这里没有用到IOC容器,直接用Quartz数据集合传递服务引用
    JobDataMap jobDataMap = new JobDataMap();
    jobDataMap.put("orderMessageService", service);
    // 新建Job
    JobDetail job = JobBuilder.newJob(OrderMessageDelayJob.class)
        .withIdentity("orderMessageDelayJob", "delayJob")
        .usingJobData(jobDataMap)
        .build();
    // 新建触发器,10秒执行一次
    Trigger trigger = TriggerBuilder.newTrigger()
        .withIdentity("orderMessageDelayTrigger", "delayJob")
        .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever())
        .build();
    scheduler.scheduleJob(job, trigger);
    // 启动调度器
    scheduler.start();
    Thread.sleep(Integer.MAX_VALUE);
  }
}

这个例子里面用了 create_time 做轮询,实际上可以添加一个调度时间 schedule_time 列做轮询,这样子才能更容易定制空闲时和忙碌时候的调度策略。上面的示例的运行效果如下:

11:58:27.202 [main] INFO org.quartz.core.QuartzScheduler - Scheduler meta-data: Quartz Scheduler (v2.3.1) 'DefaultQuartzScheduler' with instanceId 'NON_CLUSTERED'
 Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.
 NOT STARTED.
 Currently in standby mode.
 Number of jobs executed: 0
 Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads.
 Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered.

11:58:27.202 [main] INFO org.quartz.impl.StdSchedulerFactory - Quartz scheduler 'DefaultQuartzScheduler' initialized from default resource file in Quartz package: 'quartz.properties'
11:58:27.202 [main] INFO org.quartz.impl.StdSchedulerFactory - Quartz scheduler version: 2.3.1
11:58:27.209 [main] INFO org.quartz.core.QuartzScheduler - Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED started.
11:58:27.212 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 1 triggers
11:58:27.217 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.simpl.PropertySettingJobFactory - Producing instance of Job 'delayJob.orderMessageDelayJob', class=club.throwable.jdbc.OrderMessageDelayJob
11:58:27.219 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@10eb8c53
11:58:27.220 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 0 triggers
11:58:27.221 [DefaultQuartzScheduler_Worker-1] DEBUG org.quartz.core.JobRunShell - Calling execute on job delayJob.orderMessageDelayJob
11:58:34.440 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 订单处理定时任务开始执行......
11:58:34.451 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@3d27ece4
11:58:34.459 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@64e808af
11:58:34.470 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@79c8c2b7
11:58:34.477 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@19a62369
11:58:34.485 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@1673d017
11:58:34.485 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - After adding stats (total=10, active=0, idle=10, waiting=0)
11:58:34.559 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL query
11:58:34.565 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [SELECT * FROM t_order_message WHERE create_time >= ? AND create_time <= ? AND order_status IN (?) AND retry_times < ? LIMIT ?]
11:58:34.645 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
11:58:35.210 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - SQLWarning ignored: SQL state '22007', error code '1292', message [Truncated incorrect DOUBLE value: '0,-1']
11:58:35.335 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 处理订单[10086],状态由0更新为1
11:58:35.342 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL update
11:58:35.346 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?]
11:58:35.347 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
11:58:35.354 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 处理订单[10087],状态由0更新为1
11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL update
11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?]
11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
11:58:35.361 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 订单处理定时任务开始完毕......
11:58:35.363 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 1 triggers
11:58:37.206 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.simpl.PropertySettingJobFactory - Producing instance of Job 'delayJob.orderMessageDelayJob', class=club.throwable.jdbc.OrderMessageDelayJob
11:58:37.206 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 0 triggers

RabbitMQ死信队列

使用 RabbitMQ 死信队列依赖于 RabbitMQ 的两个特性: TTLDLX

TTLTime To Live ,消息存活时间,包括两个维度:队列消息存活时间和消息本身的存活时间。

DLXDead Letter Exchange ,死信交换器。

画个图描述一下这两个特性:

下面为了简单起见, TTL 使用了针对队列的维度。引入 RabbitMQ 的Java驱动:

<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>5.7.3</version>
  <scope>test</scope>
</dependency>

代码如下:

public class DlxMain {

  private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  private static final Logger LOGGER = LoggerFactory.getLogger(DlxMain.class);

  public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    Connection connection = factory.newConnection();
    Channel producerChannel = connection.createChannel();
    Channel consumerChannel = connection.createChannel();
    // dlx交换器名称为dlx.exchange,类型是direct,绑定键为dlx.key,队列名为dlx.queue
    producerChannel.exchangeDeclare("dlx.exchange", "direct");
    producerChannel.queueDeclare("dlx.queue", false, false, false, null);
    producerChannel.queueBind("dlx.queue", "dlx.exchange", "dlx.key");
    Map<String, Object> queueArgs = new HashMap<>();
    // 设置队列消息过期时间,5秒
    queueArgs.put("x-message-ttl", 5000);
    // 指定DLX相关参数
    queueArgs.put("x-dead-letter-exchange", "dlx.exchange");
    queueArgs.put("x-dead-letter-routing-key", "dlx.key");
    // 声明业务队列
    producerChannel.queueDeclare("business.queue", false, false, false, queueArgs);
    ExecutorService executorService = Executors.newSingleThreadExecutor(r -> {
      Thread thread = new Thread(r);
      thread.setDaemon(true);
      thread.setName("DlxConsumer");
      return thread;
    });
    // 启动消费者
    executorService.execute(() -> {
      try {
        consumerChannel.basicConsume("dlx.queue", true, new DlxConsumer(consumerChannel));
      } catch (IOException e) {
        LOGGER.error(e.getMessage(), e);
      }
    });
    OrderMessage message = new OrderMessage("10086");
    producerChannel.basicPublish("", "business.queue", MessageProperties.TEXT_PLAIN,
        message.getDescription().getBytes(StandardCharsets.UTF_8));
    LOGGER.info("发送消息成功,订单ID:{}", message.getOrderId());

    message = new OrderMessage("10087");
    producerChannel.basicPublish("", "business.queue", MessageProperties.TEXT_PLAIN,
        message.getDescription().getBytes(StandardCharsets.UTF_8));
    LOGGER.info("发送消息成功,订单ID:{}", message.getOrderId());

    message = new OrderMessage("10088");
    producerChannel.basicPublish("", "business.queue", MessageProperties.TEXT_PLAIN,
        message.getDescription().getBytes(StandardCharsets.UTF_8));
    LOGGER.info("发送消息成功,订单ID:{}", message.getOrderId());

    Thread.sleep(Integer.MAX_VALUE);
  }

  private static class DlxConsumer extends DefaultConsumer {

    DlxConsumer(Channel channel) {
      super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag,
                  Envelope envelope,
                  AMQP.BasicProperties properties,
                  byte[] body) throws IOException {
      LOGGER.info("处理消息成功:{}", new String(body, StandardCharsets.UTF_8));
    }
  }

  private static class OrderMessage {

    private final String orderId;
    private final long timestamp;
    private final String description;

    OrderMessage(String orderId) {
      this.orderId = orderId;
      this.timestamp = System.currentTimeMillis();
      this.description = String.format("订单[%s],订单创建时间为:%s", orderId,
          LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F));
    }

    public String getOrderId() {
      return orderId;
    }

    public long getTimestamp() {
      return timestamp;
    }

    public String getDescription() {
      return description;
    }
  }
}

运行 main() 方法结果如下:

16:35:58.638 [main] INFO club.throwable.dlx.DlxMain - 发送消息成功,订单ID:10086
16:35:58.641 [main] INFO club.throwable.dlx.DlxMain - 发送消息成功,订单ID:10087
16:35:58.641 [main] INFO club.throwable.dlx.DlxMain - 发送消息成功,订单ID:10088
16:36:03.646 [pool-1-thread-4] INFO club.throwable.dlx.DlxMain - 处理消息成功:订单[10086],订单创建时间为:2019-08-20 16:35:58
16:36:03.670 [pool-1-thread-5] INFO club.throwable.dlx.DlxMain - 处理消息成功:订单[10087],订单创建时间为:2019-08-20 16:35:58
16:36:03.670 [pool-1-thread-6] INFO club.throwable.dlx.DlxMain - 处理消息成功:订单[10088],订单创建时间为:2019-08-20 16:35:58

时间轮

时间轮 TimingWheel 是一种高效、低延迟的调度数据结构,底层采用数组实现存储任务列表的环形队列,示意图如下:

这里暂时不对时间轮和其实现作分析,只简单举例说明怎么使用时间轮实现延时任务。这里使用 Netty 提供的 HashedWheelTimer ,引入依赖:

<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-common</artifactId>
  <version>4.1.39.Final</version>
</dependency>

代码如下:

public class HashedWheelTimerMain {

  private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");

  public static void main(String[] args) throws Exception {
    AtomicInteger counter = new AtomicInteger();
    ThreadFactory factory = r -> {
      Thread thread = new Thread(r);
      thread.setDaemon(true);
      thread.setName("HashedWheelTimerWorker-" + counter.getAndIncrement());
      return thread;
    };
    // tickDuration - 每tick一次的时间间隔, 每tick一次就会到达下一个槽位
    // unit - tickDuration的时间单位
    // ticksPerWhee - 时间轮中的槽位数
    Timer timer = new HashedWheelTimer(factory, 1, TimeUnit.SECONDS, 60);
    TimerTask timerTask = new DefaultTimerTask("10086");
    timer.newTimeout(timerTask, 5, TimeUnit.SECONDS);
    timerTask = new DefaultTimerTask("10087");
    timer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
    timerTask = new DefaultTimerTask("10088");
    timer.newTimeout(timerTask, 15, TimeUnit.SECONDS);
    Thread.sleep(Integer.MAX_VALUE);
  }

  private static class DefaultTimerTask implements TimerTask {

    private final String orderId;
    private final long timestamp;

    public DefaultTimerTask(String orderId) {
      this.orderId = orderId;
      this.timestamp = System.currentTimeMillis();
    }

    @Override
    public void run(Timeout timeout) throws Exception {
      System.out.println(String.format("任务执行时间:%s,订单创建时间:%s,订单ID:%s",
          LocalDateTime.now().format(F), LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F), orderId));
    }
  }
}

运行结果:

任务执行时间:2019-08-20 17:19:49.310,订单创建时间:2019-08-20 17:19:43.294,订单ID:10086
任务执行时间:2019-08-20 17:19:54.297,订单创建时间:2019-08-20 17:19:43.301,订单ID:10087
任务执行时间:2019-08-20 17:19:59.297,订单创建时间:2019-08-20 17:19:43.301,订单ID:10088

一般来说,任务执行的时候应该使用另外的业务线程池,以免阻塞时间轮本身的运动。

选用的方案实现过程

最终选用了基于 Redis 的有序集合 Sorted SetQuartz 短轮询进行实现。具体方案是:

  • 订单创建的时候,订单ID和当前时间戳分别作为 Sorted Set 的member和score添加到订单队列 Sorted Set 中。
  • 订单创建的时候,订单ID和推送内容 JSON 字符串分别作为field和value添加到订单队列内容 Hash 中。
  • 第1步和第2步操作的时候用 Lua 脚本保证原子性。
  • 使用一个异步线程通过 Sorted Set 的命令 ZREVRANGEBYSCORE 弹出指定数量的订单ID对应的订单队列内容 Hash 中的订单推送内容数据进行处理。

对于第4点处理有两种方案:

  • 方案一:弹出订单内容数据的同时进行数据删除,也就是 ZREVRANGEBYSCOREZREMHDEL 命令要在同一个 Lua 脚本中执行,这样的话 Lua 脚本的编写难度大,并且由于弹出数据已经在 Redis 中删除,如果数据处理失败则可能需要从数据库重新查询补偿。
  • 方案二:弹出订单内容数据之后,在数据处理完成的时候再主动删除订单队列 Sorted Set 和订单队列内容 Hash 中对应的数据,这样的话需要控制并发,有重复执行的可能性。

最终暂时选用了方案一,也就是从 Sorted Set 弹出订单ID并且从 Hash 中获取完推送数据之后马上删除这两个集合中对应的数据。方案的流程图大概是这样:

这里先详细说明一下用到的 Redis 命令。

Sorted Set相关命令

ZADD 命令 - 将一个或多个成员元素及其分数值加入到有序集当中。

ZADD KEY SCORE1 VALUE1.. SCOREN VALUEN

ZREVRANGEBYSCORE 命令 - 返回有序集中指定分数区间内的所有的成员。有序集成员按分数值递减(从大到小)的次序排列。

ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count]

max:分数区间 - 最大分数。 min:分数区间 - 最小分数。 WITHSCORES:可选参数,是否返回分数值,指定则会返回得分值。 LIMIT:可选参数,offset和count原理和 MySQLLIMIT offset,size 一致,如果不指定此参数则返回整个集合的数据。 ZREM 命令 - 用于移除有序集中的一个或多个成员,不存在的成员将被忽略。

ZREM key member [member ...]

Hash相关命令 HMSET 命令 - 同时将多个field-value(字段-值)对设置到哈希表中。

HMSET KEY_NAME FIELD1 VALUE1 ...FIELDN VALUEN

HDEL 命令 - 删除哈希表key中的一个或多个指定字段,不存在的字段将被忽略。

HDEL KEY_NAME FIELD1.. FIELDN

Lua相关 加载 Lua 脚本并且返回脚本的 SHA-1 字符串: SCRIPT LOAD script 。 执行已经加载的 Lua 脚本: EVALSHA sha1 numkeys key [key ...] arg [arg ...]unpack 函数可以把 table 类型的参数转化为可变参数,不过需要注意的是 unpack 函数必须使用在非变量定义的函数调用的最后一个参数,否则会失效,详细见 Stackoverflow 的提问 table.unpack() only returns the first element 。

PS:如果不熟悉Lua语言,建议系统学习一下,因为想用好Redis,一定离不开Lua。

引入依赖:

<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-dependencies</artifactId>
      <version>2.1.7.RELEASE</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>

<dependencies>
  <dependency>
    <groupId>org.quartz-scheduler</groupId>
    <artifactId>quartz</artifactId>
    <version>2.3.1</version>
  </dependency>
  <dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>3.1.0</version>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jdbc</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context-support</artifactId>
    <version>5.1.9.RELEASE</version>
  </dependency>
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.8</version>
    <scope>provided</scope>
  </dependency>
  <dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.59</version>
  </dependency>
</dependencies>

编写 Lua 脚本 /lua/enqueue.lua/lua/dequeue.lua

-- /lua/enqueue.lua
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local zset_value = ARGV[1]
local zset_score = ARGV[2]
local hash_field = ARGV[3]
local hash_value = ARGV[4]
redis.call('ZADD', zset_key, zset_score, zset_value)
redis.call('HSET', hash_key, hash_field, hash_value)
return nil

-- /lua/dequeue.lua
-- 参考jesque的部分Lua脚本实现
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local min_score = ARGV[1]
local max_score = ARGV[2]
local offset = ARGV[3]
local limit = ARGV[4]
-- TYPE命令的返回结果是{'ok':'zset'}这样子,这里利用next做一轮迭代
local status, type = next(redis.call('TYPE', zset_key))
if status ~= nil and status == 'ok' then
  if type == 'zset' then
    local list = redis.call('ZREVRANGEBYSCORE', zset_key, max_score, min_score, 'LIMIT', offset, limit)
    if list ~= nil and #list > 0 then
      -- unpack函数能把table转化为可变参数
      redis.call('ZREM', zset_key, unpack(list))
      local result = redis.call('HMGET', hash_key, unpack(list))
      redis.call('HDEL', hash_key, unpack(list))
      return result
    end
  end
end
return nil

编写核心API代码:

// Jedis提供者
@Component
public class JedisProvider implements InitializingBean {

  private JedisPool jedisPool;

  @Override
  public void afterPropertiesSet() throws Exception {
    jedisPool = new JedisPool();
  }

  public Jedis provide(){
    return jedisPool.getResource();
  }
}

// OrderMessage
@Data
public class OrderMessage {

  private String orderId;
  private BigDecimal amount;
  private Long userId;
}

// 延迟队列接口
public interface OrderDelayQueue {

  void enqueue(OrderMessage message);

  List<OrderMessage> dequeue(String min, String max, String offset, String limit);

  List<OrderMessage> dequeue();

  String enqueueSha();

  String dequeueSha();
}

// 延迟队列实现类
@RequiredArgsConstructor
@Component
public class RedisOrderDelayQueue implements OrderDelayQueue, InitializingBean {

  private static final String MIN_SCORE = "0";
  private static final String OFFSET = "0";
  private static final String LIMIT = "10";
  private static final String ORDER_QUEUE = "ORDER_QUEUE";
  private static final String ORDER_DETAIL_QUEUE = "ORDER_DETAIL_QUEUE";
  private static final String ENQUEUE_LUA_SCRIPT_LOCATION = "/lua/enqueue.lua";
  private static final String DEQUEUE_LUA_SCRIPT_LOCATION = "/lua/dequeue.lua";
  private static final AtomicReference<String> ENQUEUE_LUA_SHA = new AtomicReference<>();
  private static final AtomicReference<String> DEQUEUE_LUA_SHA = new AtomicReference<>();
  private static final List<String> KEYS = Lists.newArrayList();

  private final JedisProvider jedisProvider;

  static {
    KEYS.add(ORDER_QUEUE);
    KEYS.add(ORDER_DETAIL_QUEUE);
  }

  @Override
  public void enqueue(OrderMessage message) {
    List<String> args = Lists.newArrayList();
    args.add(message.getOrderId());
    args.add(String.valueOf(System.currentTimeMillis()));
    args.add(message.getOrderId());
    args.add(JSON.toJSONString(message));
    try (Jedis jedis = jedisProvider.provide()) {
      jedis.evalsha(ENQUEUE_LUA_SHA.get(), KEYS, args);
    }
  }

  @Override
  public List<OrderMessage> dequeue() {
    // 30分钟之前
    String maxScore = String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000);
    return dequeue(MIN_SCORE, maxScore, OFFSET, LIMIT);
  }

  @SuppressWarnings("unchecked")
  @Override
  public List<OrderMessage> dequeue(String min, String max, String offset, String limit) {
    List<String> args = new ArrayList<>();
    args.add(max);
    args.add(min);
    args.add(offset);
    args.add(limit);
    List<OrderMessage> result = Lists.newArrayList();
    try (Jedis jedis = jedisProvider.provide()) {
      List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(), KEYS, args);
      if (null != eval) {
        for (String e : eval) {
          result.add(JSON.parseObject(e, OrderMessage.class));
        }
      }
    }
    return result;
  }

  @Override
  public String enqueueSha() {
    return ENQUEUE_LUA_SHA.get();
  }

  @Override
  public String dequeueSha() {
    return DEQUEUE_LUA_SHA.get();
  }

  @Override
  public void afterPropertiesSet() throws Exception {
    // 加载Lua脚本
    loadLuaScript();
  }

  private void loadLuaScript() throws Exception {
    try (Jedis jedis = jedisProvider.provide()) {
      ClassPathResource resource = new ClassPathResource(ENQUEUE_LUA_SCRIPT_LOCATION);
      String luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
      String sha = jedis.scriptLoad(luaContent);
      ENQUEUE_LUA_SHA.compareAndSet(null, sha);
      resource = new ClassPathResource(DEQUEUE_LUA_SCRIPT_LOCATION);
      luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
      sha = jedis.scriptLoad(luaContent);
      DEQUEUE_LUA_SHA.compareAndSet(null, sha);
    }
  }

  public static void main(String[] as) throws Exception {
    DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
    JedisProvider jedisProvider = new JedisProvider();
    jedisProvider.afterPropertiesSet();
    RedisOrderDelayQueue queue = new RedisOrderDelayQueue(jedisProvider);
    queue.afterPropertiesSet();
    // 写入测试数据
    OrderMessage message = new OrderMessage();
    message.setAmount(BigDecimal.valueOf(10086));
    message.setOrderId("ORDER_ID_10086");
    message.setUserId(10086L);
    message.setTimestamp(LocalDateTime.now().format(f));
    List<String> args = Lists.newArrayList();
    args.add(message.getOrderId());
    // 测试需要,score设置为30分钟之前
    args.add(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000));
    args.add(message.getOrderId());
    args.add(JSON.toJSONString(message));
    try (Jedis jedis = jedisProvider.provide()) {
      jedis.evalsha(ENQUEUE_LUA_SHA.get(), KEYS, args);
    }
    List<OrderMessage> dequeue = queue.dequeue();
    System.out.println(dequeue);
  }
}

这里先执行一次 main() 方法验证一下延迟队列是否生效:

[OrderMessage(orderId=ORDER_ID_10086, amount=10086, userId=10086, timestamp=2019-08-21 08:32:22.885)]

确定延迟队列的代码没有问题,接着编写一个 QuartzJob 类型的消费者 OrderMessageConsumer

@DisallowConcurrentExecution
@Component
public class OrderMessageConsumer implements Job {

  private static final AtomicInteger COUNTER = new AtomicInteger();
  private static final ExecutorService BUSINESS_WORKER_POOL = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), r -> {
    Thread thread = new Thread(r);
    thread.setDaemon(true);
    thread.setName("OrderMessageConsumerWorker-" + COUNTER.getAndIncrement());
    return thread;
  });
  private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageConsumer.class);

  @Autowired
  private OrderDelayQueue orderDelayQueue;

  @Override
  public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
    StopWatch stopWatch = new StopWatch();
    stopWatch.start();
    LOGGER.info("订单消息处理定时任务开始执行......");
    List<OrderMessage> messages = orderDelayQueue.dequeue();
    if (!messages.isEmpty()) {
      // 简单的列表等分放到线程池中执行
      List<List<OrderMessage>> partition = Lists.partition(messages, 2);
      int size = partition.size();
      final CountDownLatch latch = new CountDownLatch(size);
      for (List<OrderMessage> p : partition) {
        BUSINESS_WORKER_POOL.execute(new ConsumeTask(p, latch));
      }
      try {
        latch.await();
      } catch (InterruptedException ignore) {
        //ignore
      }
    }
    stopWatch.stop();
    LOGGER.info("订单消息处理定时任务执行完毕,耗时:{} ms......", stopWatch.getTotalTimeMillis());
  }

  @RequiredArgsConstructor
  private static class ConsumeTask implements Runnable {

    private final List<OrderMessage> messages;
    private final CountDownLatch latch;

    @Override
    public void run() {
      try {
        // 实际上这里应该单条捕获异常
        for (OrderMessage message : messages) {
          LOGGER.info("处理订单信息,内容:{}", message);
        }
      } finally {
        latch.countDown();
      }
    }
  }
}

上面的消费者设计的时候需要有以下考量:

  • 使用 @DisallowConcurrentExecution 注解不允许 Job 并发执行,其实多个 Job 并发执行意义不大,因为我们采用的是短间隔的轮询,而 Redis 是单线程处理命令,在客户端做多线程其实效果不佳。
  • 线程池 BUSINESS_WORKER_POOL 的线程容量或者队列应该综合 LIMIT 值、等分订单信息列表中使用的 size 值以及 ConsumeTask 里面具体的执行时间进行考虑,这里只是为了方便使用了固定容量的线程池。
  • ConsumeTask 中应该对每一条订单信息的处理单独捕获异常和吞并异常,或者把处理单个订单信息的逻辑封装成一个不抛出异常的方法。

其他 Quartz 相关的代码:

// Quartz配置类
@Configuration
public class QuartzAutoConfiguration {

  @Bean
  public SchedulerFactoryBean schedulerFactoryBean(QuartzAutowiredJobFactory quartzAutowiredJobFactory) {
    SchedulerFactoryBean factory = new SchedulerFactoryBean();
    factory.setAutoStartup(true);
    factory.setJobFactory(quartzAutowiredJobFactory);
    return factory;
  }

  @Bean
  public QuartzAutowiredJobFactory quartzAutowiredJobFactory() {
    return new QuartzAutowiredJobFactory();
  }

  public static class QuartzAutowiredJobFactory extends AdaptableJobFactory implements BeanFactoryAware {

    private AutowireCapableBeanFactory autowireCapableBeanFactory;

    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
      this.autowireCapableBeanFactory = (AutowireCapableBeanFactory) beanFactory;
    }

    @Override
    protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
      Object jobInstance = super.createJobInstance(bundle);
      // 这里利用AutowireCapableBeanFactory从新建的Job实例做一次自动装配,得到一个原型(prototype)的JobBean实例
      autowireCapableBeanFactory.autowireBean(jobInstance);
      return jobInstance;
    }
  }
}

这里暂时使用了内存态的 RAMJobStore 去存放任务和触发器的相关信息,如果在生产环境最好替换成基于 MySQL 也就是 JobStoreTX 进行集群化,最后是启动函数和 CommandLineRunner 的实现:

@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class, TransactionAutoConfiguration.class})
public class Application implements CommandLineRunner {

  @Autowired
  private Scheduler scheduler;

  @Autowired
  private JedisProvider jedisProvider;

  public static void main(String[] args) {
    SpringApplication.run(Application.class, args);
  }

  @Override
  public void run(String... args) throws Exception {
    // 准备一些测试数据
    prepareOrderMessageData();
    JobDetail job = JobBuilder.newJob(OrderMessageConsumer.class)
        .withIdentity("OrderMessageConsumer", "DelayTask")
        .build();
    // 触发器5秒触发一次
    Trigger trigger = TriggerBuilder.newTrigger()
        .withIdentity("OrderMessageConsumerTrigger", "DelayTask")
        .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(5).repeatForever())
        .build();
    scheduler.scheduleJob(job, trigger);
  }

  private void prepareOrderMessageData() throws Exception {
    DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
    try (Jedis jedis = jedisProvider.provide()) {
      List<OrderMessage> messages = Lists.newArrayList();
      for (int i = 0; i < 100; i++) {
        OrderMessage message = new OrderMessage();
        message.setAmount(BigDecimal.valueOf(i));
        message.setOrderId("ORDER_ID_" + i);
        message.setUserId((long) i);
        message.setTimestamp(LocalDateTime.now().format(f));
        messages.add(message);
      }
      // 这里暂时不使用Lua
      Map<String, Double> map = Maps.newHashMap();
      Map<String, String> hash = Maps.newHashMap();
      for (OrderMessage message : messages) {
        // 故意把score设计成30分钟前
        map.put(message.getOrderId(), Double.valueOf(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000)));
        hash.put(message.getOrderId(), JSON.toJSONString(message));
      }
      jedis.zadd("ORDER_QUEUE", map);
      jedis.hmset("ORDER_DETAIL_QUEUE", hash);
    }
  }
}

输出结果如下:

2019-08-21 22:45:59.518  INFO 33000 --- [ryBean_Worker-1] club.throwable.OrderMessageConsumer      : 订单消息处理定时任务开始执行......
2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-4] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_91, amount=91, userId=91, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_95, amount=95, userId=95, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_97, amount=97, userId=97, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-0] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_99, amount=99, userId=99, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-3] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_93, amount=93, userId=93, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_94, amount=94, userId=94, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_96, amount=96, userId=96, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-3] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_92, amount=92, userId=92, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-0] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_98, amount=98, userId=98, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-4] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_90, amount=90, userId=90, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.540  INFO 33000 --- [ryBean_Worker-1] club.throwable.OrderMessageConsumer      : 订单消息处理定时任务执行完毕,耗时:22 ms......
2019-08-21 22:46:04.515  INFO 33000 --- [ryBean_Worker-2] club.throwable.OrderMessageConsumer      : 订单消息处理定时任务开始执行......
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-5] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_89, amount=89, userId=89, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-6] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_87, amount=87, userId=87, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-7] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_85, amount=85, userId=85, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-5] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_88, amount=88, userId=88, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_83, amount=83, userId=83, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_81, amount=81, userId=81, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-6] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_86, amount=86, userId=86, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_82, amount=82, userId=82, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-7] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_84, amount=84, userId=84, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_80, amount=80, userId=80, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [ryBean_Worker-2] club.throwable.OrderMessageConsumer      : 订单消息处理定时任务执行完毕,耗时:1 ms......
......

首次执行的时候涉及到一些组件的初始化,会比较慢,后面看到由于我们只是简单打印订单信息,所以定时任务执行比较快。如果在不调整当前架构的情况下,生产中需要注意:

  • 切换 JobStoreJDBC 模式, Quartz 官方有完整教程,或者看笔者之前翻译的 Quartz 文档。
  • 需要监控或者收集任务的执行状态,添加预警等等。

这里其实有一个性能隐患,命令 ZREVRANGEBYSCORE 的时间复杂度可以视为为 O(N)N 是集合的元素个数,由于这里把所有的订单信息都放进了同一个 Sorted Set ( ORDER_QUEUE )中,所以在一直有新增数据的时候, dequeue 脚本的时间复杂度一直比较高,后续订单量升高之后会此处一定会成为性能瓶颈,后面会给出解决的方案。

小结

这篇文章主要从一个实际生产案例的仿真例子入手,分析了当前延时任务的一些实现方案,还基于 RedisQuartz 给出了一个完整的示例。当前的示例只是处于可运行的状态,有些问题尚未解决。下一篇文章会着眼于解决两个方面的问题:

  1. 分片。
  2. 监控。

还有一点, 架构是基于业务形态演进出来的,很多东西需要结合场景进行方案设计和改进,思路仅供参考,切勿照搬代码 。

以上所述是小编给大家介绍的使用Redis实现延时任务的解决方案,非常不错,具有一定的参考借鉴价值,需要的朋友参考下吧!

(0)

相关推荐

  • 利用Redis实现延时处理的方法实例

    背景 在开发中,往往会遇到一些关于延时任务的需求.例如 •生成订单30分钟未支付,则自动取消 •生成订单60秒后,给用户发短信 对上述的任务,我们给一个专业的名字来形容,那就是延时任务. 最近需要做一个延时处理的功能,主要是从kafka中消费消息后根据消息中的某个延时字段来进行延时处理,在实际的实现过程中有一些需要注意的地方,记录如下. 实现过程 说到java中的定时功能,首先想到的Timer和ScheduledThreadPoolExecutor,但是相比之下Timer可以排除,主要原因有以下

  • python 监听salt job状态,并任务数据推送到redis中的方法

    salt分发后,主动将已完成的任务数据推送到redis中,使用redis的生产者模式,进行消息传送 #coding=utf-8 import fnmatch,json,logging import salt.config import salt.utils.event from salt.utils.redis import RedisPool import sys,os,datetime,random import multiprocessing,threading from joi.util

  • PHP swoole和redis异步任务实现方法分析

    本文实例讲述了PHP swoole和redis异步任务实现方法.分享给大家供大家参考,具体如下: redis异步任务 interface.php <?php for($i=0;$i<100;$i++){ $msg = "zhezhao[".$i."]"; $redis = new Redis(); $redis->connect("127.0.0.1"); $redis->publish("test",

  • 基于Redis实现分布式锁以及任务队列

    一.前言 双十一刚过不久,大家都知道在天猫.京东.苏宁等等电商网站上有很多秒杀活动,例如在某一个时刻抢购一个原价1999现在秒杀价只要999的手机时,会迎来一个用户请求的高峰期,可能会有几十万几百万的并发量,来抢这个手机,在高并发的情形下会对数据库服务器或者是文件服务器应用服务器造成巨大的压力,严重时说不定就宕机了,另一个问题是,秒杀的东西都是有量的,例如一款手机只有10台的量秒杀,那么,在高并发的情况下,成千上万条数据更新数据库(例如10台的量被人抢一台就会在数据集某些记录下 减1),那次这个

  • Node.js + Redis Sorted Set实现任务队列

    需求:功能 A 需要调用第三方 API 获取数据,而第三方 API 自身是异步处理方式,在调用后会返回数据与状态 { data: "查询结果", "status": "正在异步处理中" } ,这样就需要间隔一段时间后再去调用第三方 API 获取数据.为了用户在使用功能 A 时不会因为第三方 API 正在异步处理中而必须等待,将用户请求加入任务队列中,返回部分数据并关闭请求.然后定时从任务队列里中取出任务调用第三方 API,若返回状态为"

  • 使用Redis实现延时任务的解决方案

    最近在生产环境刚好遇到了延时任务的场景,调研了一下目前主流的方案,分析了一下优劣并且敲定了最终的方案.这篇文章记录了调研的过程,以及初步方案的实现. 候选方案对比 下面是想到的几种实现延时任务的方案,总结了一下相应的优势和劣势. 方案 优势 劣势 选用场景 JDK 内置的延迟队列 DelayQueue 实现简单 数据内存态,不可靠 一致性相对低的场景 调度框架和 MySQL 进行短间隔轮询 实现简单,可靠性高 存在明显的性能瓶颈 数据量较少实时性相对低的场景 RabbitMQ 的 DLX 和 T

  • SpringBoot AOP Redis实现延时双删功能实战

    目录 一.业务场景 1.此时存在的问题 2.解决方案 3.为何要延时500毫秒? 4.为何要两次删除缓存? 二.代码实践 1.引入Redis和SpringBoot AOP依赖 2.编写自定义aop注解和切面 3.application.yml 4.user_db.sql脚本 5.UserController 6.UserService 三.测试验证 四.代码工程及地址 一.业务场景 在多线程并发情况下,假设有两个数据库修改请求,为保证数据库与redis的数据一致性,修改请求的实现中需要修改数据库

  • redis实现延时队列的两种方式(小结)

    背景 项目中的流程监控,有几种节点,需要监控每一个节点是否超时.按传统的做法,肯定是通过定时任务,去扫描然后判断,但是定时任务有缺点:1,数据量大会慢:2,时间不好控制,太短,怕一次处理不完,太长状态就会有延迟.所以就想到用延迟队列的方式去实现. 一,redis的过期key监控 1,开启过期key监听 在redis的配置里把这个注释去掉 notify-keyspace-events Ex 然后重启redis 2,使用redis过期监听实现延迟队列 继承KeyExpirationEventMess

  • Redis做数据持久化的解决方案及底层原理

    目录 数据持久化 RDB 生成方法 save bgsave 优点 缺点 AOF AOF记录过程 ServerCron 作用 server.hz 写入策略 End 之前的文章介绍了Redis的简单数据结构的相关使用和底层原理,这篇文章我们就来聊一下Redis应该如何保证高可用. 数据持久化 我们知道虽然单机的Redis虽然性能十分的出色, 单机能够扛住10w的QPS,这是得益于其基于内存的快速读写操作,那如果某个时间Redis突然挂了怎么办?我们需要一种持久化的机制,来保存内存中的数据,否则数据就

  • 浅谈Redis哨兵模式高可用解决方案

    目录 一.序言 1.目标与收获 2.端口规划 二.单机模拟 (一)服务规划 1.Redis实例 2.哨兵服务 (二)服务配置 1.Redis实例 2.哨兵服务 (三)服务管理 1.Redis实例 2.哨兵服务 三.客户端整合 (一)基础整合 1.全局配置文件 2.集成配置 (二)读写分离 一.序言 Redis高可用有两种模式:哨兵模式和集群模式,本文基于哨兵模式搭建一主两从三哨兵Redis高可用服务. 1.目标与收获 一主两从三哨兵Redis服务,基本能够满足中小型项目的高可用要求,使用Supe

  • redis实现分布式session的解决方案

    目录 一.首先Session 二.分布式Session 补充: 一.首先Session Session 是客户端与服务器通讯会话技术, 比如浏览器登陆.记录整个浏览会话信息.session存放在服务器,关闭浏览器不会失效. Session实现原理 客户对向服务器端发送请求后,Session 创建在服务器端,返回Sessionid给客户端浏览器保存在本地,当下次发送请求的时候,在请求头中传递sessionId获取对应的从服务器上获取对应的Sesison 请求过程: 服务器端接受到客户端请求,会创建

  • 面试分析分布式架构Redis热点key大Value解决方案

    目录 引言 1.面试官:你在项目中有没有遇到Redis热点数据问题,一般都是什么原因引起的? 2.面试官:真实项目中,那热点数据问题你是如何准确定位的呢? 3.如何解决热点数据问题 4.面试官:关于Redis最后一个问题,Redis支持丰富的数据类型,那么这些数据类型存储的大Value如何解决,线上有遇到这种情况吗? 总结 引言 关于 Redis 热点数据 & 大 key 大 value 问题也是容易被问的高阶问题,不如一次痛快点说完,让面试官无话可说,个人工作经验中,热点数据问题在工作中相比雪

  • 基于Redis实现延时队列的优化方案小结

    目录 一.延时队列的应用 二.延时队列的实现 三.总结 一.延时队列的应用 近期在开发部门的新项目,其中有个关键功能就是智能推送,即根据用户行为在特定的时间点向用户推送相应的提醒消息,比如以下业务场景: 在用户点击充值项后,半小时内未充值,向用户推送充值未完成提醒. 在用户最近一次阅读行为2小时后,向用户推送继续阅读提醒. 在用户新注册或退出应用N分钟后,向用户推送合适的推荐消息. … 上述场景的共同特征就是在某事件触发后延迟一定时间后再执行特定任务,若事件触发时间点可知,则上述逻辑也可等价于在

  • redis缓存延时双删的原因分析

    缓存为啥是删除,而不是更新? 如果是更新,存在分布式事务问题,可能出现修改了缓存,数据库修改失败的情况.只是删除缓存的话,就算数据库修改失败,下次查询会直接取数据库的数据,也不会出现脏数据. 延时双删是什么? 就是在增删改某实体类的时候,要对该实体类的缓存进行清空,清空的位置在数据库操作方法的前后. 采用反证法 只先删  只后删 结论 从而得出 前删和后删都有问题.所以采用延时双删的策略 思考2:为啥是延时 依然是反证法.下图这情况是双删依然存在旧缓存的情况,延时是确保 修改数据库->清

  • Mysql和redis缓存不一致问题的解决方案

    目录 一.问题描述 二.解决方案 1.给缓存数据设置过期时间 2.缓存延时双删 3.删除缓存重试机制 4.读取biglog异步删除缓存 三.总结 一.问题描述 redis.mysql双写缓存不一致: 在更新缓存方面,对于更新完数据库,是更新缓存呢,还是删除缓存.又或者是先删除缓存,再更新数据库,其实大家存在很大的争议.于是博主战战兢兢,写了这篇文章. 二.解决方案 1.给缓存数据设置过期时间 先做一个说明,从理论上来说,给缓存设置过期时间,是保证最终一致性的解决方案.这种方案下,我们可以对存入缓

随机推荐