Redisson延迟队列执行流程源码解析

目录
  • 引言
  • demo示例
  • SUBSCRIBE指令
  • zrangebyscore和zrange指令
  • BLPOP指令
  • 最后定时器源码解析
  • 总结:

引言

在实际分布式项目中延迟任务一般不会使用JDK自带的延迟队列,因为它是基于JVM内存存储,没有持久化操作,所以当服务重启后就会丢失任务。

在项目中可以使用MQ死信队列或redisson延迟队列进行处理延迟任务,本篇文章将讲述redisson延迟队列的使用demo和其执行源码。

demo示例

通过脚手架创建一个简易springboot项目,引入redisson的maven依赖,并简单配置redisson连接属性。

    <!-- redisson引用 -->
    <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson</artifactId>
        <version>3.16.6</version>
    </dependency>
@Configuration
public class RedissonConfig {
    @Value("${spring.redis.host}")
    private String host;
    @Value("${spring.redis.port}")
    private String port;
    /**
     * 获取redissonClient实例
     *
     * @return
     * @throws Exception
     */
    @Bean
    public RedissonClient getRedisson() {
        Config config = new Config();
        String address = "redis://" + host + ":" + port;
        config.useSingleServer().setAddress(address);
        return Redisson.create(config);
    }
}

定义一个redisson延迟队列插入和获取任务处理类RedissonQueueHandle,通过控制spring的bean加载周期开启独立线程获取延迟任务。这里获取延迟任务使用了三种方法,除了第一种阻塞式获取任务方法外,其他两种方法都不是百分比按照延迟参数获取到任务,因为是时间间隔定时循环获取延迟任务。

/**
 * redisson延迟队列处理器
 *
 * @author zrh
 */
@Slf4j
@Component
public class RedissonQueueHandle implements InitializingBean {
    private final RBlockingQueue<RedisDataEntity<?>> queue;
    private final RDelayedQueue<RedisDataEntity<?>> delayedQueue;
    public RedissonQueueHandle (RedissonClient client) {
        this.queue = client.getBlockingQueue("redisson:queue");
        this.delayedQueue = client.getDelayedQueue(queue);
    }
    @Override
    public void afterPropertiesSet () {
        // 开一个线程阻塞式获取任务
        thread();
        // 使用netty时间轮循环获取任务
//        watchDog(new HashedWheelTimer());
        // 使用线程池定时获取任务
//        schedule();
    }
    private void thread () {
        new Thread(() -> {
            while (true) {
                try {
                    RedisDataEntity entity = queue.take();
                    log.info("本次获取数据:{},耗时:{}", entity, System.currentTimeMillis() - entity.getTime());
                } catch (Exception e) {
                }
            }
        }, "zrh").start();
    }
    private void watchDog (final HashedWheelTimer timer) {
        timer.newTimeout(timeout -> {
            RedisDataEntity entity = queue.poll();
            if (null != entity) {
                log.info("本次获取数据:{},耗时:{}", entity, System.currentTimeMillis() - entity.getTime());
            }
            watchDog(timer);
        }, 3, TimeUnit.SECONDS);
    }
    private void schedule () {
        Executors.newScheduledThreadPool(1).scheduleWithFixedDelay(() -> {
            RedisDataEntity entity = queue.poll();
            if (null != entity) {
                log.info("本次获取数据:{},耗时:{}", entity, System.currentTimeMillis() - entity.getTime());
            }
        }, 5, 5, TimeUnit.SECONDS);
    }
    /**
     * 放入redis,定时过期
     *
     * @param entity
     */
    public void offer (RedisDataEntity entity) {
        try {
            delayedQueue.offer(entity, entity.getExpire(), TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            log.error("放入redis延迟队列异常", e);
        }
    }
}

放入redisson延迟队列可以是字符串也可以是对象RedisDataEntity,因为有进行IO磁盘存储操作,所以必须实现Serializable序列化接口。

/**
 * @Author: ZRH
 * @Date: 2022/1/10 11:54
 */
@Data
public class RedisDataEntity<T> implements Serializable {
    /**
     * 数据
     */
    private final T data;
    /**
     * 过期时间(单位:毫秒)
     */
    private final Long expire;
    /**
     * 添加时间
     */
    private final Long time;
    public RedisDataEntity (T data, Long expire, Long time) {
        this.data = data;
        this.expire = expire;
        this.time = time;
    }
}

然后开一个插入数据接口:

/**
 * @Author: ZRH
 * @Date: 2022/1/10 11:45
 */
@Slf4j
@RestController
public class IndexController {
    private final RedissonQueueHandle redisHandle;
    public IndexController (RedissonQueueHandle redisHandle) {
        this.redisHandle = redisHandle;
    }
    @PostMapping("redissonQueue")
    public String redissonQueue (@RequestParam String data, @RequestParam Long expire) {
        RedisDataEntity entity = new RedisDataEntity(data, expire, System.currentTimeMillis());
        log.info("本次添加数据:{}", entity);
        redisHandle.offer(entity);
        return "ok";
    }
}
访问接口设置延迟30秒:http://localhost:8802/redissonQueue?data=a&expire=30000,打印结果如下
2022-01-14 14:21:52.140  INFO 10808 --- [nio-8802-exec-1] c.r.web.controller.IndexController       : 本次添加数据:RedisDataEntity(data=a, expire=30000, time=1642141312135)
2022-01-14 14:21:52.887  INFO 10808 --- [nio-8802-exec-2] c.r.web.controller.IndexController       : 本次添加数据:RedisDataEntity(data=a, expire=30000, time=1642141312887)
2022-01-14 14:22:22.240  INFO 10808 --- [            zrh] c.r.web.redis.RedissonQueueHandle        : 本次获取数据:RedisDataEntity(data=a, expire=30000, time=1642141312135),耗时:30105
2022-01-14 14:22:22.914  INFO 10808 --- [            zrh] c.r.web.redis.RedissonQueueHandle        : 本次获取数据:RedisDataEntity(data=a, expire=30000, time=1642141312887),耗时:30027

初始执行流程源码解析 redisson延迟队列最终都是和redis服务进行交互的,那可以使用monitor命令查看redis中执行了哪些命令,这样对了解其执行流程有很大帮助。

上图是项目启动时,对redis发送的几个指令

"SUBSCRIBE":订阅队列"redisson_delay_queue_channel:{redisson:queue}",里面有个定时任务通过该队列获取数据

"zrangebyscore":获取"redisson_delay_queue_timeout:{redisson:queue}"集合中排序score值在0到1642148406748(当前时间戳)内的前100元素

"zrange":获取"redisson_delay_queue_timeout:{redisson:queue}"集合中第一个元素,用于获取下一个元素的到期时间

"BLPOP":取出并移除"redisson:queue"列表里的第一个元素,如果没有元素就一直等待阻塞。所以这里会阻塞着

"rpush":如果指令"zrangebyscore"获取到了元素,那就将元素推送到队列redisson:queue内

"lrem":如果指令"zrangebyscore"获取到了元素,那就删除队列"redisson_delay_queue:{redisson:queue}内元素为v的第一个元素

SUBSCRIBE指令

进入RedissonDelayedQueue延迟队列的构造函数,里面就有上述执行指令的lua脚本命令(为了不影响篇幅删了一部分代码,下同):

    ......
    protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {
        super(codec, commandExecutor, name);
        // list结构,用于延迟队列的订阅发布
        channelName = prefixName("redisson_delay_queue_channel", getRawName());
        // list结构,存放元素原始顺序
        queueName = prefixName("redisson_delay_queue", getRawName());
        // zset结构,存放未到期元素,并按照过期时间进行排好序
        timeoutSetName = prefixName("redisson_delay_queue_timeout", getRawName());
        QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {
            @Override
            protected RFuture<Long> pushTaskAsync() {
                return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
                        "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
                      + "if #expiredValues > 0 then "
                          + "for i, v in ipairs(expiredValues) do "
                              + "local randomId, value = struct.unpack('dLc0', v);"
                              + "redis.call('rpush', KEYS[1], value);"
                              + "redis.call('lrem', KEYS[3], 1, v);"
                          + "end; "
                          + "redis.call('zrem', KEYS[2], unpack(expiredValues));"
                      + "end; "
                        // get startTime from scheduler queue head task
                      + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
                      + "if v[1] ~= nil then "
                         + "return v[2]; "
                      + "end "
                      + "return nil;",
                      Arrays.<Object>asList(getRawName(), timeoutSetName, queueName),
                      System.currentTimeMillis(), 100);
            }
            @Override
            protected RTopic getTopic() {
                return RedissonTopic.createRaw(LongCodec.INSTANCE, commandExecutor, channelName);
            }
        };
        queueTransferService.schedule(queueName, task);
        this.queueTransferService = queueTransferService;
    }

继续跟进queueTransferService.schedule(queueName, task)方法,因为第一次进入tasks集合,所以最后执行start()方法:

    ......
    private final ConcurrentMap<String, QueueTransferTask> tasks = new ConcurrentHashMap<>();
    public synchronized void schedule(String name, QueueTransferTask task) {
        QueueTransferTask oldTask = tasks.putIfAbsent(name, task);
        if (oldTask == null) {
            task.start();
        } else {
            oldTask.incUsage();
        }
    }

进入QueueTransferTask,继续跟进schedulerTopic.addListener(...)方法:

    ......
    private int messageListenerId;
    private int statusListenerId;
    public void start() {
        RTopic schedulerTopic = getTopic();
        statusListenerId = schedulerTopic.addListener(new BaseStatusListener() {
            @Override
            public void onSubscribe(String channel) {
                pushTask();
            }
        });
        messageListenerId = schedulerTopic.addListener(Long.class, new MessageListener<Long>() {
            @Override
            public void onMessage(CharSequence channel, Long startTime) {
                scheduleTask(startTime);
            }
        });
    }

然后会进入PublishSubscribeService.subscribe(...)方法:

注意:这里继续调用重载方法subscribe(...)时设置了参数:PubSubType.SUBSCRIBE

    ......
    public RFuture<PubSubConnectionEntry> subscribe(Codec codec, ChannelName channelName, RedisPubSubListener<?>... listeners) {
        return subscribe(PubSubType.SUBSCRIBE, codec, channelName, getEntry(channelName), listeners);
    }
    private RFuture<PubSubConnectionEntry> subscribe(PubSubType type, Codec codec, ChannelName channelName, MasterSlaveEntry entry, RedisPubSubListener<?>... listeners) {
        RPromise<PubSubConnectionEntry> promise = new RedissonPromise<>();
        AsyncSemaphore lock = getSemaphore(channelName);
        // 创建一个线程任务放入lock对象
        lock.acquire(() -> {
            if (promise.isDone()) {
                lock.release();
                return;
            }
            subscribe(codec, channelName, entry, promise, type, lock, listeners);
        });
        return promise;
    }

AsyncSemaphore对象的acquire(...)方法会把线程任务放入自身队列listeners里,然后依次读取执行线程任务;

public class AsyncSemaphore {
    private final AtomicInteger counter;
    private final Queue<Runnable> listeners = new ConcurrentLinkedQueue<>();
    public void acquire(Runnable listener) {
        listeners.add(listener);
        tryRun();
    }
    private void tryRun() {
        if (counter.decrementAndGet() >= 0) {
            Runnable listener = listeners.poll();
            if (listener == null) {
                counter.incrementAndGet();
                return;
            }
            listener.run();
        } else {
            if (counter.incrementAndGet() > 0) {
                tryRun();
            }
        }
    }
}

然后继续跟进方法subscribe(codec, channelName, entry, promise, type, lock, listeners):

    .....
    private void subscribe(Codec codec, ChannelName channelName, MasterSlaveEntry entry,
                            RPromise<PubSubConnectionEntry> promise, PubSubType type,
                            AsyncSemaphore lock, RedisPubSubListener<?>... listeners) {
        PubSubConnectionEntry connEntry = name2PubSubConnection.get(new PubSubKey(channelName, entry));
        if (connEntry != null) {
            addListeners(channelName, promise, type, lock, connEntry, listeners);
            return;
        }
        freePubSubLock.acquire(() -> {
            if (promise.isDone()) {
                lock.release();
                freePubSubLock.release();
                return;
            }
            MasterSlaveEntry msEntry = Optional.ofNullable(connectionManager.getEntry(entry.getClient())).orElse(entry);
            // 第一次进入entry2PubSubConnection集合为null,所以使用默认值,最后 freeEntry == null
            PubSubEntry freePubSubConnections = entry2PubSubConnection.getOrDefault(msEntry, new PubSubEntry());
            PubSubConnectionEntry freeEntry = freePubSubConnections.getEntries().peek();
            if (freeEntry == null) {
                freePubSubLock.release();
                connect(codec, channelName, msEntry, promise, type, lock, listeners);
                return;
            }
            ......
        });
    }

继续跟进方法connect(codec, channelName, msEntry, promise, type, lock, listeners):

    ......
    private void connect(Codec codec, ChannelName channelName,
                         MasterSlaveEntry msEntry, RPromise<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener<?>... listeners) {
        RFuture<RedisPubSubConnection> connFuture = nextPubSubConnection(msEntry, channelName);
        promise.onComplete((res, e) -> {...});
        connFuture.onComplete((conn, ex) -> {
            if (ex != null) {...}
            freePubSubLock.acquire(() -> {
                PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
                int remainFreeAmount = entry.tryAcquire();
                PubSubKey key = new PubSubKey(channelName, msEntry);
                PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(key, entry);
                if (oldEntry != null) {...}
                if (remainFreeAmount > 0) {
                    addFreeConnectionEntry(channelName, entry);
                }
                freePubSubLock.release();
                RFuture<Void> subscribeFuture = addListeners(channelName, promise, type, lock, entry, listeners);
                ChannelFuture future;
                // 这里通过上述重载方法传递的参数可知,最后走else逻辑
                if (PubSubType.PSUBSCRIBE == type) {
                    future = entry.psubscribe(codec, channelName);
                } else {
                    future = entry.subscribe(codec, channelName);
                }
                future.addListener((ChannelFutureListener) future1 -> {
                    if (!future1.isSuccess()) {...}
                    connectionManager.newTimeout(timeout ->
                            subscribeFuture.cancel(false),
                            config.getTimeout(), TimeUnit.MILLISECONDS);
                });
            });
        });
    }

该方法中支线内容不表述,主要看方法 entry.subscribe(codec, channelName),最后进入RedisPubSubConnection.async(...)方法,就是发送SUBSCRIBE指令的流程:

zrangebyscore和zrange指令

订阅指令SUBSCRIBE发出后,在QueueTransferTask.start()方法里添加的监听器触发了,就会执行pushTask()

pushTaskAsync()方法执行完(lua脚本执行完),就会开启一个定时任务scheduleTask()

    ......
    protected abstract RTopic getTopic();
    protected abstract RFuture<Long> pushTaskAsync();
    private void pushTask() {
        // 这个抽象方法在之前构建RedissonDelayedQueue对象的构造函数里有实现,最后返回元素过期时间
        RFuture<Long> startTimeFuture = pushTaskAsync();
        startTimeFuture.onComplete((res, e) -> {
            if (e != null) {
                if (e instanceof RedissonShutdownException) {
                    return;
                }
                log.error(e.getMessage(), e);
                scheduleTask(System.currentTimeMillis() + 5 * 1000L);
                return;
            }
            if (res != null) {
                scheduleTask(res);
            }
        });
    }

BLPOP指令

当RedissonDelayedQueue延迟队列构造完成后,会调用延迟队列的take()方法获取延迟任务,然后会进入RedissonBlockingQueue.takeAsync()方法:

    ......
    @Override
    public RFuture<V> takeAsync() {
        return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BLPOP_VALUE, getRawName(), 0);
    }
    /*
     * (non-Javadoc)
     * @see java.util.concurrent.BlockingQueue#take()
     */
    @Override
    public V take() throws InterruptedException {
        return commandExecutor.getInterrupted(takeAsync());
    }
    ......

注意这里的参数其值为 BLPOP,很明显这里就是和我们要找的BLPOP指令有关,所以这里其实就是客户端通过BLPOP指令阻塞式获取值。在客户端开个线程一直循环阻塞获取元素即可;

看下源码继续向下进入CommandAsyncService.writeAsync(...)方法,然后继续向下进入RedisExecutor.execute()方法:

    ......
    public void execute() {
        if (mainPromise.isCancelled()) {...}
        if (!connectionManager.getShutdownLatch().acquire()) {...}
        codec = getCodec(codec);
        // 获取连接
        RFuture<RedisConnection> connectionFuture = getConnection();
        RPromise<R> attemptPromise = new RedissonPromise<>();
        mainPromiseListener = (r, e) -> {...};
        if (attempt == 0) {...}
        scheduleRetryTimeout(connectionFuture, attemptPromise);
        connectionFuture.onComplete((connection, e) -> {
            if (connectionFuture.isCancelled()) {...}
            if (!connectionFuture.isSuccess()) {...}
            // 连接获取成功就执行当前方法
            sendCommand(attemptPromise, connection);
            writeFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    checkWriteFuture(writeFuture, attemptPromise, connection);
                }
            });
        });
        attemptPromise.onComplete((r, e) -> {...});
    }

该方法里一些支线方法按下不表。中间有个超时重试机制,使用netty的时间轮,不是重点也就不表述了。

先获取写入操作连接对象任务,然后进入方法sendCommand(attemptPromise, connection)发送

指令指令:"BLPOP",参数:"redisson:queue" "0"

offer添加任务流程源码解析 项目启动完成后,添加一个延迟任务到redis中,查看redis中所执行的指令:

然后跟进插入元素offer方法,进入RedissonDelayedQueue.offerAsync()方法内,如下所示:

    ......
    @Override
    public void offer(V e, long delay, TimeUnit timeUnit) {
        get(offerAsync(e, delay, timeUnit));
    }
    @Override
    public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {
        if (delay < 0) {
            throw new IllegalArgumentException("Delay can't be negative");
        }
        long delayInMs = timeUnit.toMillis(delay);
        long timeout = System.currentTimeMillis() + delayInMs;
        long randomId = ThreadLocalRandom.current().nextLong();
        return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_VOID,
                "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);"
              + "redis.call('zadd', KEYS[2], ARGV[1], value);"
              + "redis.call('rpush', KEYS[3], value);"
              // if new object added to queue head when publish its startTime
              // to all scheduler workers
              + "local v = redis.call('zrange', KEYS[2], 0, 0); "
              + "if v[1] == value then "
                 + "redis.call('publish', KEYS[4], ARGV[1]); "
              + "end;",
              Arrays.<Object>asList(getRawName(), timeoutSetName, queueName, channelName),
              timeout, randomId, encode(e));
    }

其中很明显一长串的脚本命令就是在redis中执行的指令,基本流程比较简单:

"zadd":这是向zset集合"redisson_delay_queue_timeout:{redisson:queue}"里添加元素数据(此数据被处理过,不用管其结构),排序值为当前时间戳+延迟时间

"rpush":把元素数据推送到list队列"redisson:queue"

"zrange":获取zset集合"redisson_delay_queue_timeout:{redisson:queue}"中排好序的第一个元素

"publish":如果上述获取的元素是本次插入的元素,那就发布通知队列"redisson_delay_queue_channel:{redisson:queue}",内容为当前元素的过期时间,这样做是为了减少本次元素到期的时间差。

最后定时器源码解析

定时器任务主要是通过监听器监听到了有新的客户端订阅或元素通知发布出来时,就会执行pushTask()和scheduleTask(...)方法:

    ......
    private int messageListenerId;
    private int statusListenerId;
    public void start() {
        RTopic schedulerTopic = getTopic();
        // 当有新的客户端订阅schedulerTopic,就是触发执行pushTask()方法
        statusListenerId = schedulerTopic.addListener(new BaseStatusListener() {
            @Override
            public void onSubscribe(String channel) {
                pushTask();
            }
        });
        // 当redis有新的消息通知,就会触发scheduleTask(...)方法,startTime为上述中publish通知的元素过期时间
        messageListenerId = schedulerTopic.addListener(Long.class, new MessageListener<Long>() {
            @Override
            public void onMessage(CharSequence channel, Long startTime) {
                scheduleTask(startTime);
            }
        });
    }

pushTask()方法是对redis延迟队列进行操作的方法,scheduleTask(...)是netty时间轮来控制调用pushTask()方法,所以pushTask()和scheduleTask()互相调用。

    ......
    private void scheduleTask(final Long startTime) {
        TimeoutTask oldTimeout = lastTimeout.get();
        if (startTime == null) {...}
        if (oldTimeout != null) {...}
        long delay = startTime - System.currentTimeMillis();
        if (delay > 10) {
            Timeout timeout = connectionManager.newTimeout(new TimerTask() {
                @Override
                public void run(Timeout timeout) throws Exception {
                    pushTask();
                    TimeoutTask currentTimeout = lastTimeout.get();
                    if (currentTimeout.getTask() == timeout) {
                        lastTimeout.compareAndSet(currentTimeout, null);
                    }
                }
            }, delay, TimeUnit.MILLISECONDS);
            if (!lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout))) {
                timeout.cancel();
            }
        } else {
            pushTask();
        }
    }
    protected abstract RTopic getTopic();
    protected abstract RFuture<Long> pushTaskAsync();
    private void pushTask() {
        RFuture<Long> startTimeFuture = pushTaskAsync();
        startTimeFuture.onComplete((res, e) -> {
            if (e != null) {
                if (e instanceof RedissonShutdownException) {
                    return;
                }
                log.error(e.getMessage(), e);
                scheduleTask(System.currentTimeMillis() + 5 * 1000L);
                return;
            }
            if (res != null) {
                scheduleTask(res);
            }
        });
    }

总结:

当有新的客户端进行订阅,就调用pushTask()方法拉取数据放入阻塞队列。当有信的消息进行发布,就调用scheduleTask(...)方法,并根据其过期时间判断是通过时间轮延迟调用还是立即调用pushTask()方法。最后 redisson延迟队列的源码相对而言其实是比较抽象复杂的,感觉没有其分布式锁这块源码容易解析。但仔细用心去看,跟着主要方法走还是可以了解其执行流程。

以上就是Redisson延迟队列执行流程源码解析的详细内容,更多关于Redisson延迟队列执行流程的资料请关注我们其它相关文章!

(0)

相关推荐

  • SpringBoot集成Redisson实现延迟队列的场景分析

    使用场景 1.下单成功,30分钟未支付.支付超时,自动取消订单 2.订单签收,签收后7天未进行评价.订单超时未评价,系统默认好评 3.下单成功,商家5分钟未接单,订单取消 4.配送超时,推送短信提醒 ...... 对于延时比较长的场景.实时性不高的场景,我们可以采用任务调度的方式定时轮询处理.如:xxl-job 今天我们采用一种比较简单.轻量级的方式,使用 Redis 的延迟队列来进行处理.当然有更好的解决方案,可根据公司的技术选型和业务体系选择最优方案.如:使用消息中间件Kafka.Rabbi

  • spring boot集成redisson的最佳实践示例

    目录 前言 集成jedis实例,xml方式 集成前引用的jar springbean配置xml 集成redisson实例,javabean的方式 集成前引入的jar javabean配置如下 提供实例化javabean application.properties添加如下配置 前言 本文假使你了解spring boot并实践过,非spring boot用户可跳过也可借此研究一下. redisson是redis的java客户端程序,国内外很多公司都有在用,如下, 和spring的集成中官方给出的实

  • 分布式利器redis及redisson的延迟队列实践

    目录 前言碎语 延迟队列多种实现方式 redisson中的延迟队列实现 文末结语 前言碎语 首先说明下需求,一个用户中心产品,用户在试用产品有三天的期限,三天到期后准时准点通知用户,试用产品到期了.这个需求如果不是准时通知,而是每天定点通知就简单了.如果需要准时通知就只能上延迟队列了.使用场景除了如上,典型的业务场景还有电商中的延时未支付订单失效等等. 延迟队列多种实现方式 1.如基于RabbitMQ的队列ttl+死信路由策略:通过设置一个队列的超时未消费时间,配合死信路由策略,到达时间未消费后

  • 生产redisson延时队列不消费问题排查解决

    目录 问题描述 初步排查 排查过程 解决方案 redisson 延时队列原理 流程总结 问题描述 项目使用redisson延时队列功能,实现直播的开播提醒,突然有一天业务爆出问题,未触发开播提醒. 初步排查 首先通过查询生产日志,发送端日志存在,没有消费日志,猜测消费端没有消费到延时消息,,在dba的协助下查询redis队列,消息也确实存在,但已经过了过期时间,由此证明redisson消费者出现问题.通过服务日志发现在最后一次设置自定义推送任务是在一次服务发布之前,服务发布后,之前设置的自定义推

  • Redisson 主从一致性问题详解

    目录 Redisson 主从一致性 Java 实现 mutilLock RedissonConfig.java TestRedisson.java Redisson 主从一致性 我们先来说一下 Redis 的主从模式,Redis Master(主节点)中处理所有发向 Redis 的写操作(增删改),Redis Slave (从节点)只负责处理读操作,主节点会不断将自己的数据同步给从节点,确保主从之间的数据一致性,但是数据同步会存在一定的延时,主从一致性问题就是因为延时而导致的 比如我们通过 se

  • Quarkus集成redis操作Redisson实现数据互通

    目录 前言 集成redis 复制Redisson序列化 使用 前言 博主所在公司大量使用了redis缓存,redis客户端用的Redisson.在Quarkus集成redis时,博主尝试使用Redisson客户端直接集成,发现,在jvm模式下运行quarkus没点问题,但是在打native image时,就报错了,尝试了很多方式都是莫名其妙的异常.最后决定采用quarkus官方的redis客户端,但是Redisson客户端数据序列化方式是特有的,不是简单的String,所以quarkus中的re

  • Redisson延迟队列执行流程源码解析

    目录 引言 demo示例 SUBSCRIBE指令 zrangebyscore和zrange指令 BLPOP指令 最后定时器源码解析 总结: 引言 在实际分布式项目中延迟任务一般不会使用JDK自带的延迟队列,因为它是基于JVM内存存储,没有持久化操作,所以当服务重启后就会丢失任务. 在项目中可以使用MQ死信队列或redisson延迟队列进行处理延迟任务,本篇文章将讲述redisson延迟队列的使用demo和其执行源码. demo示例 通过脚手架创建一个简易springboot项目,引入rediss

  • Spring Security过滤器链加载执行流程源码解析

    目录 Spring Security实现原理 一.Spring Security过滤器链加载 1.注册名为 springSecurityFilterChain的过滤器 2.查看 DelegatingFilterProxy类 3.查看 FilterChainProxy类 3.1 查看 doFilterInternal方法. 3.2 查看 getFilters方法. 4 查看 SecurityFilterChain接口 5 查看 SpringBootWebSecurityConfiguration类

  • Android开发OkHttp执行流程源码分析

    目录 前言 介绍 执行流程 OkHttpClient client.newCall(request): RealCall.enqueue() Dispatcher.enqueue() Interceptor RetryAndFollowUpInterceptor BridgeInterceptor CacheInterceptor 前言 OkHttp 是一套处理 HTTP 网络请求的依赖库,由 Square 公司设计研发并开源,目前可以在 Java 和 Kotlin 中使用. 对于 Androi

  • Redis命令处理过程源码解析

    本文基于社区版Redis 4.0.8 1.命令解析 Redis服务器接收到的命令请求首先存储在客户端对象的querybuf输入缓冲区,然后解析命令请求的各个参数,并存储在客户端对象的argv和argc字段. 客户端解析命令请求的入口函数为readQueryFromClient,会读取socket数据存储到客户端对象的输入缓冲区,并调用函数processInputBuffer解析命令请求. 注:内联命令:使用telnet会话输入命令的方式 void processInputBuffer(clien

  • Autowired的注入过程源码解析

    目录 一.案例场景 二.案例解析 三.问题修正 一.案例场景 在使用 @Autowired 时,你或多或少都会遇过类似的错误: required a single bean, but 2 were found 为了重现这个错误,我们可以先写一个案例来模拟下. @RestController @Slf4j @Validated public class StudentController { @Autowired DataService dataService; @RequestMapping(p

  • Dubbo3的Spring适配原理与初始化流程源码解析

    目录 引言 Spring Context Initialization FactoryBean BeanDefinition 初始化bean 解决依赖 解决属性 Dubbo Spring的一些问题及解决办法 Dubbo spring 2.7 初始化过程 Dubbo spring 3的初始化过程 属性占位符解决失败 ReferenceBean被过早初始化问题 Reference注解可能出现@Autowire注入失败的问题 引言 Dubbo 国内影响力最大的开源框架之一,非常适合构建大规模微服务集群

  • Kubernetes kubectl中Pod创建流程源码解析

    目录 确立目标 先写一个Pod的Yaml 部署Pod 查询Pod kubectl create 的调用逻辑 Main Match Command Create RunCreate Summary 确立目标 从创建pod的全流程入手,了解各组件的工作内容,组件主要包括以下 kubectl kube-apiserver kube-scheduler kube-controller kubelet 理解各个组件之间的相互协作,目前是kubectl 先写一个Pod的Yaml apiVersion: v1

  • SpringBoot整合Spring Security过滤器链加载执行流程源码分析(最新推荐)

    目录 1.引言 2.Spring Security过滤器链加载 2.1.注册名为 springSecurityFilterChain的过滤器 3.查看 DelegatingFilterProxy类 4.查看 FilterChainProxy类 4.1 查看 doFilterInternal方法 4.2 查看 getFilters方法 5 查看 SecurityFilterChain接口 6. 查看 SpringBootWebSecurityConfiguration类 总结: 1.引言 在 Sp

  • Spring Transaction事务实现流程源码解析

    目录 一.基于xml形式开启Transaction 1. 创建数据库user 2. 创建一个maven 项目 3. 通过xml形式配置事务 1) 创建Spring命名空间 2) 开启事务配置 3) 创建UserService类 4. 测试事务 1) 抛出RuntimeException 2) 注释掉RuntimeException 二.事务开启入口TxNamespaceHandler AnnotationDrivenBeanDefinitionParser 三.AOP驱动事务 Transacti

  • SpringMVC请求流程源码解析

    目录 一.SpringMVC使用 1.工程创建 2.工程配置 3.启动工程 二.SpringMVC启动过程 1.父容器启动过程 2.子容器启动过程(SpringMvc容器) 3.九大组件的初始化 1.处理器映射器的初始化 2.处理器适配器的初始化 4.拦截器的初始化 三.SpringMVC请求过程 1.请求流程图 2.业务描述 一.SpringMVC使用 1.工程创建 创建maven工程. 添加java.resources目录. 引入Spring-webmvc 依赖. <dependency>

随机推荐