redis用list做消息队列的实现示例

目录
  • 生产消息服务
  • 消费消息服务,定时任务
  • 日志
  • 测试

leftPush消息入队,rightPop对应,消息出队。

rightPop(RedisConstant.MQ_LIST, 0L, TimeUnit.SECONDS)阻塞出队,0表示永久阻塞

生产消息服务

@Service
public class RedisService {
    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    public Object publish() {
        OrderDTO dto = new OrderDTO();
        dto.setId(1);
        dto.setCreateTime(new Date());
        dto.setMoney("12.34");
        dto.setOrderNo("orderNo1");
        String s = JSON.toJSONString(dto);

        ListOperations<String, String> listOperations = redisTemplate.opsForList();
        //leftPush和rightPop对应,左边入队,右边出队
        listOperations.leftPush(RedisConstant.MQ_LIST, s);

        //因为出队是阻塞读取的,所以上一步入队后,数据立刻就被驱走了,下一步size=0
        Long size = listOperations.size(RedisConstant.MQ_LIST);
        List<String> list = new ArrayList<>();
        if (size != null && size > 0) {
             list = listOperations.range(RedisConstant.MQ_LIST, 0, size - 1);
        }
        return list;

    }
}

测试

@RestController
@RequestMapping("redisList")
public class RedisListController {

    @Autowired
    private RedisService redisService;

    @GetMapping("publish")
    public Object publish() {
        return redisService.publish();
    }
}

消费消息服务,定时任务

@Component
public class RedisConsumeTask {
    @Autowired
    private RedisService redisService;

    @TaskLock(RedisConstant.CONSUME_REDIS_LIST)
    @Scheduled(cron = "0/10 * * * * ?")
    public void consumeMqList() {
        redisService.consumeMqList();
    }
}

@Service
@Slf4j
public class RedisService {

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    public void consumeMqList() {
        ListOperations<String, String> listOperations = redisTemplate.opsForList();
        //0时间,表示阻塞永久
        //待机一小时后,再次发消息,消费不了了,阻塞有问题啊。还得轮寻啊
        //String s = listOperations.rightPop(RedisConstant.MQ_LIST, 0L, TimeUnit.SECONDS);
        String s = listOperations.rightPop(RedisConstant.MQ_LIST);
        if (s == null) {
            return;
        }

        log.info("{} = {}", RedisConstant.MQ_LIST, s);

        OrderDTO dto = JSON.parseObject(s, OrderDTO.class);
        log.info("dto = {}", dto);
    }
}

日志

@Component
@Aspect
public class TaskLockAop {

    @Autowired
    private RedisLockRegistry redisLockRegistry;

    @Around("execution(@TaskLock * * (..))")
    public Object taskAround(ProceedingJoinPoint pjp) throws Throwable {

        TaskLock taskAnnotation = ((MethodSignature)pjp.getSignature()).getMethod().getAnnotation(TaskLock.class);

        String lockKey = taskAnnotation.value();
        Lock lock = redisLockRegistry.obtain(lockKey);
        try {
            lock.tryLock(30L, TimeUnit.SECONDS);
            System.out.println("任务开始, " + lockKey + ", " + new Date());

            return pjp.proceed();

        } finally {
            lock.unlock();
            System.out.println("任务结束, " + lockKey + ", " + new Date());
        }
    }
}

测试

http://localhost:9040/redisList/publish

["{“createTime”:1574394538430,“id”:1,“money”:“12.34”,“orderNo”:“orderNo1”}"]

下面一直阻塞,任务开始了,不收到消息,永远不会结束。
阻塞有问题,改用轮询了。

先启动发送消息服务,发送消息。后启动消费消息服务,可以消费消息。这一点,比发布订阅要稳定。

关联项目https://github.com/mingwulipo/cloud-demo.git

到此这篇关于redis用list做消息队列的实现示例的文章就介绍到这了,更多相关redis list消息队列内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Redis 使用 List 实现消息队列的优缺点

    目录 什么是消息队列 消息队列满足哪些特性 消息有序性 重复消息处理 可靠性 List 实现消息队列 LPUSH RPOP 实时消费问题 重复消费 消息可靠性 需要注意的是 Redission 实战 添加依赖 Java 代码实战 单元测试 总结 需要注意的是,我们要避免生产者过快,消费者过慢导致的消息堆积占用 Redis 的内存. 分布式系统中必备的一个中间件就是消息队列,通过消息队列我们能对服务间进行异步解耦.流量消峰.实现最终一致性. 目前市面上已经有 RabbitMQ.RochetMQ.A

  • redis用list做消息队列的实现示例

    目录 生产消息服务 消费消息服务,定时任务 日志 测试 leftPush消息入队,rightPop对应,消息出队. rightPop(RedisConstant.MQ_LIST, 0L, TimeUnit.SECONDS)阻塞出队,0表示永久阻塞 生产消息服务 @Service public class RedisService {     @Autowired     private RedisTemplate<String, String> redisTemplate;     publi

  • RabbitMQ消息队列实现延迟任务示例

    目录 一.序言 1.实现原理 2.组件选型 二.方案设计 (一)服务器 (二)生产者 (三)消费者 三.SpringBoot实现 (一)生产者 (二)消费者 (三)通用工具包 一.序言 延迟任务应用广泛,延迟任务典型应用场景有订单超时自动取消:支付回调重试.其中订单超时取消具有幂等性属性,无需考虑重复消费问题:支付回调重试需要考虑重复消费问题. 延迟任务具有如下特点:在未来的某个时间点执行:一般仅执行一次. 1.实现原理 生产者将带有延迟信息的消息发送到RabbitMQ交换机中,等待延迟时间结束

  • 深入理解redis分布式锁和消息队列

    最近博主在看redis的时候发现了两种redis使用方式,与之前redis作为缓存不同,利用的是redis可设置key的有效时间和redis的BRPOP命令. 分布式锁 由于目前一些编程语言,如PHP等,不能在内存中使用锁,或者如Java这样的,需要一下更为简单的锁校验的时候,redis分布式锁的使用就足够满足了. redis的分布式锁其实就是基于setnx方法和redis对key可设置有效时间的功能来实现的.基本用法比较简单. public boolean tryLock(String loc

  • 详解Redis用链表实现消息队列

    前言 Redis链表经常会被用于消息队列的服务,以完成多程序之间的消息交换.个人认为redis消息队列有一个好处,就是可以实现分布式和共享,就和memcache作为mysql的缓存和mysql自带的缓存一样. 链表实现消息队列 Redis链表支持前后插入以及前后取出,所以如果往尾部插入元素,往头部取出元素,这就是一种消息队列,也可以说是消费者/生产者模型.可以利用lpush和rpop来实现.但是有一个问题,如果链表中没有数据,那么消费者将要在while循环中调用rpop,这样以来就浪费cpu资源

  • 使用PHP访问RabbitMQ消息队列的方法示例

    本文实例讲述了使用PHP访问RabbitMQ消息队列的方法.分享给大家供大家参考,具体如下: 扩展安装 PHP访问RabbitMQ实际使用的是AMQP协议,所以我们只要安装epel库中的php-pecl-amqp这个包即可 rpm -ivh http://mirror.neu.edu.cn/fedora/epel/6/x86_64/epel-release-6-8.noarch.rpm yum install php-pecl-amqp 交换建立 <?php $connection = new

  • go+redis实现消息队列发布与订阅的详细过程

    在做项目过程中,实现websocket得时候,不知道哪里写的不太合适,客户端消息收到一定程度,剩下的消息收不到,修改了缓冲区大小,还是没有解决问题,后面因为项目结束期比较紧张,没有时间调试消息的时候,改用了redis队列去做了消息得暂存,客户端轮询去拿对应的消息. 1.生产者随机发布消息,用rpush发布.2.消费者用lpop订阅消费,一旦没有消息,随机休眠.redis做消息队列的缺点:没有持久化.一旦消息没有人消费,积累到一定程度后就会丢失 package main import ( "fmt

  • 使用 Redis 流实现消息队列的代码

    在介绍了 Redis 流的基本功能之后, 现在是时候使用这些功能来构建一些实际的应用了. 消息队列作为流的典型应用之一, 具有非常好的示范性, 因此我们将使用 Redis 流的相关功能构建一个消息队列应用, 这个消息队列跟我们之前使用其他 Redis 数据结构构建的消息队列具有相似的功能. 代码清单 10-1 展示了一个具有基本功能的消息队列实现: 代码最开头的是几个转换函数, 它们负责对程序的相关输入输出进行转换和格式化: MessageQueue 类用于实现消息队列, 它的添加消息.移除消息

  • Spring Boot 使用 Disruptor 做内部高性能消息队列

    目录 Disruptor介绍 Disruptor 的核心概念 Ring Buffer Sequence Disruptor Sequencer Sequence Barrier Wait Strategy Event EventProcessor EventHandler Producer 案例-demo 总结 工作中遇到项目使用Disruptor做消息队列,对你没看错,不是Kafka,也不是rabbitmq.Disruptor有个最大的优点就是快,还有一点它是开源的哦,下面做个简单的记录. D

  • python实现RabbitMQ的消息队列的示例代码

    最近在研究redis做消息队列时,顺便看了一下RabbitMQ做消息队列的实现.以下是总结的RabbitMQ中三种exchange模式的实现,分别是fanout, direct和topic. base.py: import pika # 获取认证对象,参数是用户名.密码.远程连接时需要认证 credentials = pika.PlainCredentials("admin", "admin") # BlockingConnection(): 实例化连接对象 # C

随机推荐