springboot整合redis之消息队列

目录
  • 一、项目准备
  • 二、配置类
  • 三、redis中list数据类型
    • 定时器监听队列
    • 运行即监控队列
  • 四、发布/订阅模式
  • 五、ZSet实现延迟队列

一、项目准备

依赖

        <!-- RedisTemplate -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <!-- Redis-Jedis -->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>

application.yaml配置文件

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    database: 0
    timeout: 4000
    jedis:
      pool:
        max-wait: -1
        max-active: -1
        max-idle: 20
        min-idle: 10

二、配置类

public class ObjectMapperConfig {

    public static final ObjectMapper objectMapper;
    private static final String PATTERN = "yyyy-MM-dd HH:mm:ss";

    static {
        JavaTimeModule javaTimeModule = new JavaTimeModule();
        javaTimeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer());
        javaTimeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer());
        objectMapper = new ObjectMapper()
                // 转换为格式化的json(控制台打印时,自动格式化规范)
                //.enable(SerializationFeature.INDENT_OUTPUT)
                // Include.ALWAYS  是序列化对像所有属性(默认)
                // Include.NON_NULL 只有不为null的字段才被序列化,属性为NULL 不序列化
                // Include.NON_EMPTY 如果为null或者 空字符串和空集合都不会被序列化
                // Include.NON_DEFAULT 属性为默认值不序列化
                .setSerializationInclusion(JsonInclude.Include.NON_NULL)
                // 如果是空对象的时候,不抛异常
                .configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false)
                // 反序列化的时候如果多了其他属性,不抛出异常
                .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
                // 取消时间的转化格式,默认是时间戳,可以取消,同时需要设置要表现的时间格式
                .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
                .setDateFormat(new SimpleDateFormat(PATTERN))
                // 对LocalDateTime序列化跟反序列化
                .registerModule(javaTimeModule)

                .setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY)
                // 此项必须配置,否则会报java.lang.ClassCastException: java.util.LinkedHashMap cannot be cast to XXX
                .enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY)
        ;
    }

    static class LocalDateTimeSerializer extends JsonSerializer<LocalDateTime> {
        @Override
        public void serialize(LocalDateTime value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
            gen.writeString(value.format(DateTimeFormatter.ofPattern(PATTERN)));
        }
    }

    static class LocalDateTimeDeserializer extends JsonDeserializer<LocalDateTime> {
        @Override
        public LocalDateTime deserialize(JsonParser p, DeserializationContext deserializationContext) throws IOException {
            return LocalDateTime.parse(p.getValueAsString(), DateTimeFormatter.ofPattern(PATTERN));
        }
    }

}
@Configuration
public class RedisConfig {

    /**
     * redisTemplate配置
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        // 配置连接工厂
        template.setConnectionFactory(factory);

        //使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值(默认使用JDK的序列化方式)
        Jackson2JsonRedisSerializer<Object> jacksonSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
        jacksonSerializer.setObjectMapper(ObjectMapperConfig.objectMapper);
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

        // 使用StringRedisSerializer来序列化和反序列化redis的key,value采用json序列化
        template.setKeySerializer(stringRedisSerializer);
        template.setValueSerializer(jacksonSerializer);

        // 设置hash key 和value序列化模式
        template.setHashKeySerializer(stringRedisSerializer);
        template.setHashValueSerializer(jacksonSerializer);
        template.afterPropertiesSet();

        return template;
    }
}

三、redis中list数据类型

在Redis中,List类型是按照插入顺序排序的字符串链表。和数据结构中的普通链表一样,我们可以在其头部和尾部添加新的元素

优势:

  • 顺序排序,保证先进先出
  • 队列为空时,自动从Redis数据库删除
  • 在队列的两头插入或删除元素,效率极高,即使队列中元素达到百万级
  • List中可以包含的最大元素数量是4294967295

定时器监听队列

生产者

@Slf4j
@Component
public class MessageProducer {

    public static final String MESSAGE_KEY = "message:queue";

    @Autowired
    private RedisTemplate<String,Object> redisTemplate;

    public void lPush() {
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                Long size = redisTemplate.opsForList().leftPush(MESSAGE_KEY, Thread.currentThread().getName() + ":hello world");
                log.info(Thread.currentThread().getName() + ":put message size = " + size);
            }).start();
        }
    }
}

消费者:消费消息,定时器以达到监听队列功能

@Slf4j
@Component
@EnableScheduling
public class MessageConsumer {

    public static final String MESSAGE_KEY = "message:queue";

    @Autowired
    private RedisTemplate<String,Object> redisTemplate;

    @Scheduled(initialDelay = 5 * 1000, fixedRate = 2 * 1000)
    public void rPop() {
        String message = (String) redisTemplate.opsForList().rightPop(MESSAGE_KEY);
        log.info(message);
    }
}
@RestController
public class RedisController {

    @Autowired
    private MessageProducer messageProducer;

    @GetMapping("/lPush")
    public void lPush() {
        messageProducer.lPush();
    }
}

测试

http://localhost:8080/lPush

可能出现的问题:

1.通过定时器监听List中是否有待处理消息,每执行一次都会发起一次连接,这会造成不必要的浪费。

2.生产速度大于消费速度,队列堆积,消息时效性差,占用内存。

运行即监控队列

修改消息消费者代码。

当队列没有元素时,会阻塞10秒,然后再次监听队列,
需要注意的是,阻塞时间必须小于连接超时时间

@Slf4j
@Component
@EnableScheduling
public class MessageConsumer {

    public static final String MESSAGE_KEY = "message:queue";

    @Autowired
    private RedisTemplate<String,Object> redisTemplate;

    //@Scheduled(initialDelay = 5 * 1000, fixedRate = 2 * 1000)
    public void rPop() {
        String message = (String) redisTemplate.opsForList().rightPop(MESSAGE_KEY);
        log.info(message);
    }

    @PostConstruct
    public void brPop() {
        new Thread(() -> {
            while (true) {
                String message = (String) redisTemplate.opsForList().rightPop(MESSAGE_KEY, 10, TimeUnit.SECONDS);
                log.info(message);
            }
        }).start();
    }
}

阻塞时间不能为负,直接报错超时为负
阻塞时间为零,此时阻塞时间等于超时时间,最后报错连接超时
阻塞时间大于超时时间,报错连接超时

测试:

消息不可重复消费,因为消息从队列POP之后就被移除了,即不支持多个消费者消费同一批数据

消息丢失,消费期间发生异常,消息未能正常消费

四、发布/订阅模式

消息可以重复消费,多个消费者订阅同一频道即可

一个消费者根据匹配规则订阅多个频道

消费者只能消费订阅之后发布的消息,这意味着,消费者下线再上线这期间发布的消息将会丢失

数据不具有持久化。同样Redis宕机也会数据丢失

消息发布后,是推送到一个缓冲区(内存),消费者从缓冲区拉取消息,当消息堆积,缓冲区溢出,消费者就会被迫下线,同时释放对应的缓冲区

RedisConfig中添加监听器

    /**
     * redis消息监听器容器
     */
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        //订阅频道,通配符*表示任意多个占位符
        container.addMessageListener(new MySubscribe(), new PatternTopic("channel*"));

        return container;
    }

订阅者

package com.yzm.redis08.message;

import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;

public class MySubscribe implements MessageListener {

    @Override
    public void onMessage(Message message, byte[] bytes) {
        System.out.println("订阅频道:" + new String(message.getChannel()));
        System.out.println("接收数据:" + new String(message.getBody()));
    }
}

消息发布

    @GetMapping("/publish")
    public void publish() {
        redisTemplate.convertAndSend("channel_first", "hello world");
    }

另一种发布方式

    /**
     * redis消息监听器容器
     */
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        //订阅频道,通配符*表示任意多个占位符
        container.addMessageListener(new MySubscribe(), new PatternTopic("channel*"));
        // 通配符?:表示一个占位符
        MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MySubscribe2(), "getMessage");
        listenerAdapter.afterPropertiesSet();
        container.addMessageListener(listenerAdapter, new PatternTopic("channel?"));

        return container;
    }
public class MySubscribe2 {

    public void getMessage(Object message, String channel) {
        System.out.println("订阅频道2:" + channel);
        System.out.println("接收数据2:" + message);
    }
}
    @GetMapping("/publish2")
    public void publish2() {
        redisTemplate.convertAndSend("channel2", "hello world");
    }

消息是实体对象,进行转换

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
    private static final long serialVersionUID = 5250232737975907491L;
    private Integer id;
    private String username;
}
public class MySubscribe3 implements MessageListener {

    @Override
    public void onMessage(Message message, byte[] bytes) {
        Jackson2JsonRedisSerializer<User> jacksonSerializer = new Jackson2JsonRedisSerializer<>(User.class);
        jacksonSerializer.setObjectMapper(ObjectMapperConfig.objectMapper);
        User user = jacksonSerializer.deserialize(message.getBody());

        System.out.println("订阅频道3:" + new String(message.getChannel()));
        System.out.println("接收数据3:" + user);
    }
}
    /**
     * redis消息监听器容器
     */
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        //订阅频道,通配符*:表示任意多个占位符
        container.addMessageListener(new MySubscribe(), new PatternTopic("channel*"));
        // 通配符?:表示一个占位符
        MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MySubscribe2(), "getMessage");
        listenerAdapter.afterPropertiesSet();
        container.addMessageListener(listenerAdapter, new PatternTopic("channel?"));

        container.addMessageListener(new MySubscribe3(), new PatternTopic("user"));

        return container;
    }
    @GetMapping("/publish3")
    public void publish3() {
        User user = User.builder().id(1).username("yzm").build();
        redisTemplate.convertAndSend("user", user);
    }

五、ZSet实现延迟队列

生产消息,score = 时间搓+60s随机数

    public static final String MESSAGE_ZKEY = "message:ZSetqueue";
    public volatile AtomicInteger count =  new AtomicInteger();
    public void zAdd() {
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                int increment = count.getAndIncrement();
                log.info(Thread.currentThread().getName() + ":put message to zset = " + increment);
                double score = System.currentTimeMillis() + new Random().nextInt(60 * 1000);
                redisTemplate.opsForZSet().add(MESSAGE_ZKEY, Thread.currentThread().getName() + " hello zset:" + increment, score);
            }).start();
        }
    }

消费者:定时任务,每秒执行一次

    public static final String MESSAGE_ZKEY = "message:ZSetqueue";
    public SimpleDateFormat simpleDateFormat = new SimpleDateFormat();
    @Scheduled(initialDelay = 5 * 1000, fixedRate = 1000)
    public void zrangebysocre() {
        log.info("延时队列消费。。。");
        // 拉取score小于当前时间戳的消息
        Set<Object> messages = redisTemplate.opsForZSet().rangeByScore(MESSAGE_ZKEY, 0, System.currentTimeMillis());
        if (messages != null) {
            for (Object message : messages) {
                Double score = redisTemplate.opsForZSet().score(MESSAGE_ZKEY, message);
                log.info("消费了:" + message + "消费时间为:" + simpleDateFormat.format(score));
                redisTemplate.opsForZSet().remove(MESSAGE_ZKEY, message);
            }
        }
    }
    @GetMapping("/zadd")
    public void zadd() {
        messageProducer.zAdd();
    }

到此这篇关于springboot整合redis之消息队列的文章就介绍到这了,更多相关springboot redis消息队列内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • SpringBoot利用redis集成消息队列的方法

    一.pom文件依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> 二.创建消息接收者 变量.方法及构造函数进行标注,完成自动装配的工作. 通过 @Autowired的使用来消除 set ,get方法. @Autowired pub

  • SpringBoot集成Redis实现消息队列的方法

    list 原理说明 Redis 的 list 是按照插入顺序排序的字符串链表. 如图所示,可以通过 lpush 和 rpop 或者 rpush 和 lpop 实现消息队列. 1 lpush 和 rpop 2 rpush 和 lpop 消息队列功能实现 引入 Redis 依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data

  • springboot整合redis之消息队列

    目录 一.项目准备 二.配置类 三.redis中list数据类型 定时器监听队列 运行即监控队列 四.发布/订阅模式 五.ZSet实现延迟队列 一.项目准备 依赖 <!-- RedisTemplate --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> &

  • SpringBoot整合Redis实现消息发布与订阅的示例代码

    当我们在多个集群应用中使用到本地缓存时,在数据库数据得到更新后,为保持各个副本当前被修改的数据与数据库数据保持同步,在数据被操作后向其他集群应用发出被更新数据的通知,使其删除;下次当其他应用请求该被更新的数据时,应用会到数据库去取,也就是最新的数据,从而使得被更新数据与数据库保持同步! 能实现发送与接收信息的中间介有很多,比如:RocketMQ.RabbitMQ.ActiveMQ.Kafka等,本次主要简单介绍Redis的推送与订阅功能并集成Spring Boot的实现. 1.添加SpringB

  • Springboot整合ActiveMQ实现消息队列的过程浅析

    目录 pom中导入坐标 书写yml配置 业务层代码 监听器代码 业务层代码 确保你启动了自己电脑的activemq. pom中导入坐标 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> 书写yml配置 spring:  activemq:

  • SpringBoot整合Redis、ApachSolr和SpringSession的示例

    本文介绍了SpringBoot整合Redis.ApachSolr和SpringSession,分享给大家,具体如下: 一.简介 SpringBoot自从问世以来,以其方便的配置受到了广大开发者的青睐.它提供了各种starter简化很多繁琐的配置.SpringBoot整合Druid.Mybatis已经司空见惯,在这里就不详细介绍了.今天我们要介绍的是使用SpringBoot整合Redis.ApacheSolr和SpringSession. 二.SpringBoot整合Redis Redis是大家比

  • SpringBoot整合RabbitMQ实现消息确认机制

    前面几篇案例已经将常用的交换器(DirectExchange.TopicExchange.FanoutExchange)的用法介绍完了,现在我们来看一下消息的回调,也就是消息确认. 在rabbitmq-provider项目的application.yml文件上加上一些配置 server: port: 8021 spring: #给项目来个名字 application: name: rabbitmq-provider #配置rabbitMq 服务器 rabbitmq: host: 127.0.0.

  • Springboot整合Redis实现超卖问题还原和流程分析(分布式锁)

    目录 超卖简单代码 超卖问题 单服务器单应用情况下 设置synchronized Redis实现分布式锁 通过超时间解决上述问题 通过key设置值匹配的方式解决形同虚设问题 最终版 超卖简单代码 写一段简单正常的超卖逻辑代码,多个用户同时操作同一段数据,探究出现的问题. Redis中存储一项数据信息,请求对应接口,获取商品数量信息: 商品数量信息如果大于0,则扣减1,重新存储Redis中: 运行代码测试问题. /** * Redis数据库操作,超卖问题模拟 * @author * */ @Res

  • SpringBoot整合WxJava开启消息推送的实现

    目录 1.引入 WxJava 依赖 2.申请微信小程序 3.微信小程序配置信息 4.消息推送配置 5.接收消息推送的 API 6.消息推送测试 接入微信小程序消息推送服务,可以3种方式选择其一: 1.开发者服务器接收消息推送2.云函数接收消息推送3.微信云托管服务接收消息推送 开发者服务器接收消息推送,开发者需要按照如下步骤完成: 1.填写服务器配置2.验证服务器地址的有效性3.据接口文档实现业务逻辑,接收消息和事件 1.引入 WxJava 依赖 <!-- web支持 --> <depe

  • SpringBoot整合RabbitMQ处理死信队列和延迟队列

    目录 简介 实例代码 路由配置 控制器 发送器 接收器 application.yml 实例测试 简介 说明 本文用示例介绍SpringBoot整合RabbitMQ时如何处理死信队列/延迟队列. RabbitMQ消息简介 RabbitMQ的消息默认不会超时. 什么是死信队列?什么是延迟队列? 死信队列: DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱.当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,

随机推荐