SpringAOP+RabbitMQ+WebSocket实战详解

背景

最近公司的客户要求,分配给员工的任务除了有微信通知外,还希望PC端的网页也能实时收到通知。管理员分配任务是在我们的系统A,而员工接受任务是在系统B。两个系统都是现在已投入使用的系统。

技术选型

根据需求我们最终选用SpringAOP+RabbitMQ+WebSocket。

SpringAOP可以让我们不修改原有代码,直接将原有service作为切点,加入切面。RabbitMQ可以让A系统和B系统解耦。WebSocket则可以达到实时通知的要求。

SpringAOP

AOP称为面向切面编程,在程序开发中主要用来解决一些系统层面上的问题,比如日志,事务,权限等待。是Spring的核心模块,底层是通过动态代理来实现(动态代理将在之后的文章重点介绍)。

基本概念

Aspect(切面):通常是一个类,里面可以定义切入点和通知。

JointPoint(连接点):程序执行过程中明确的点,一般是方法的调用。

Advice(通知):AOP在特定的切入点上执行的增强处理,有before,after,afterReturning,afterThrowing,around。

Pointcut(切入点):就是带有通知的连接点,在程序中主要体现为书写切入点表达式。

通知类型

Before:在目标方法被调用之前做增强处理。

@Before只需要指定切入点表达式即可

AfterReturning:在目标方法正常完成后做增强。

@AfterReturning除了指定切入点表达式后,还可以指定一个返回值形参名returning,代表目标方法的返回值

AfterThrowing:主要用来处理程序中未处理的异常。

@AfterThrowing除了指定切入点表达式后,还可以指定一个throwing的返回值形参名,可以通过该形参名

来访问目标方法中所抛出的异常对象

After:在目标方法完成之后做增强,无论目标方法时候成功完成。

@After可以指定一个切入点表达式

Around:环绕通知,在目标方法完成前后做增强处理,环绕通知是最重要的通知类型,像事务,日志等都是环绕通知,注意编程中核心是一个ProceedingJoinPoint。

RabbitMQ

从图中我们可以看到RabbitMQ主要的结构有:Routing、Binding、Exchange、Queue。

Queue

Queue(队列)RabbitMQ的作用是存储消息,队列的特性是先进先出。

Exchange

生产者产生的消息并不是直接发送给消息队列Queue的,而是要经过Exchange(交换器),由Exchange再将消息路由到一个或多个Queue,还会将不符合路由规则的消息丢弃。

Routing

用于标记或生产者寻找Exchange。

Binding

用于Exchange和Queue做关联。

Exchange Type fanout

fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。

direct

direct会把消息路由到那些binding key与routing key完全匹配的Queue中。

topic

direct规则是严格意义上的匹配,换言之Routing Key必须与Binding Key相匹配的时候才将消息传送给Queue,那么topic这个规则就是模糊匹配,可以通过通配符满足一部分规则就可以传送。

headers

headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。

WebSocket

了解websocket必须先知道几个常用的web通信技术及其区别。

短轮询

短轮询的基本思路就是浏览器每隔一段时间向浏览器发送http请求,服务器端在收到请求后,不论是否有数据更新,都直接进行响应。这种方式实现的即时通信,本质上还是浏览器发送请求,服务器接受请求的一个过程,通过让客户端不断的进行请求,使得客户端能够模拟实时地收到服务器端的数据的变化。

这种方式的优点是比较简单,易于理解,实现起来也没有什么技术难点。缺点是显而易见的,这种方式由于需要不断的建立http连接,严重浪费了服务器端和客户端的资源。尤其是在客户端,距离来说,如果有数量级想对比较大的人同时位于基于短轮询的应用中,那么每一个用户的客户端都会疯狂的向服务器端发送http请求,而且不会间断。人数越多,服务器端压力越大,这是很不合理的。

因此短轮询不适用于那些同时在线用户数量比较大,并且很注重性能的Web应用。

长轮询/ comet

comet指的是,当服务器收到客户端发来的请求后,不会直接进行响应,而是先将这个请求挂起,然后判断服务器端数据是否有更新。如果有更新,则进行响应,如果一直没有数据,则到达一定的时间限制(服务器端设置)后关闭连接。

长轮询和短轮询比起来,明显减少了很多不必要的http请求次数,相比之下节约了资源。长轮询的缺点在于,连接挂起也会导致资源的浪费。

SSE

SSE是HTML5新增的功能,全称为Server-Sent Events。它可以允许服务推送数据到客户端。SSE在本质上就与之前的长轮询、短轮询不同,虽然都是基于http协议的,但是轮询需要客户端先发送请求。而SSE最大的特点就是不需要客户端发送请求,可以实现只要服务器端数据有更新,就可以马上发送到客户端。

SSE的优势很明显,它不需要建立或保持大量的客户端发往服务器端的请求,节约了很多资源,提升应用性能。并且SSE的实现非常简单,不需要依赖其他插件。

WebSocket

WebSocket是Html5定义的一个新协议,与传统的http协议不同,该协议可以实现服务器与客户端之间全双工通信。简单来说,首先需要在客户端和服务器端建立起一个连接,这部分需要http。连接一旦建立,客户端和服务器端就处于平等的地位,可以相互发送数据,不存在请求和响应的区别。

WebSocket的优点是实现了双向通信,缺点是服务器端的逻辑非常复杂。现在针对不同的后台语言有不同的插件可以使用。

四种Web即时通信技术比较

从兼容性角度考虑,短轮询>长轮询>长连接SSE>WebSocket;

从性能方面考虑,WebSocket>长连接SSE>长轮询>短轮询。

实战

项目使用SpringBoot搭建。RabbitMQ的安装这里不讲述。

RabbitMQ配置

两个系统A、B都需要操作RabbitMQ,其中A生产消息,B消费消息。故都需要配置。

1、首先引入RabbitMQ的dependency:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

这个dependency中包含了RabbitMQ相关dependency。

2、在项目的配置文件里配置为使用rabbitmq及其参数。

application-pro.yml

#消息队列
message.queue.type: rabbitmq
## rabbit mq properties
rabbitmq:
 host: localhost
 port: 5672
 username: guest
 password: guest

application.properties

#将要使用的队列名
rabbitmq.websocket.msg.queue=websocket_msg_queue

3、创建配置文件。队列的创建交给spring。

RabbitMQConfig.java

@Configuration
@EnableRabbit
public class RabbitMQConfig {

  @Value("${rabbitmq.host}")
  private String host;
  @Value("${rabbitmq.port}")
  private String port;
  @Value("${rabbitmq.username}")
  private String username;
  @Value("${rabbitmq.password}")
  private String password;
  @Value("${rabbitmq.websocket.msg.queue}")
  private String webSocketMsgQueue;

  @Bean
  public ConnectionFactory connectionFactory() throws IOException {
    CachingConnectionFactory factory = new CachingConnectionFactory();
    factory.setUsername(username);
    factory.setPassword(password);
//    factory.setVirtualHost("test");
    factory.setHost(host);
    factory.setPort(Integer.valueOf(port));
    factory.setPublisherConfirms(true);

    //设置队列参数,是否持久化、队列TTL、队列消息TTL等
    factory.createConnection().createChannel(false).queueDeclare(webSocketMsgQueue, true, false, false, null);
    return factory;
  }

  @Bean
  public MessageConverter messageConverter() {
    return new Jackson2JsonMessageConverter();
  }

  @Bean
  @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
  // 必须是prototype类型
  public RabbitTemplate rabbitTemplate() throws IOException {
    return new RabbitTemplate(connectionFactory());
  }

  @Bean
  public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() throws IOException {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setConcurrentConsumers(3);
    factory.setMaxConcurrentConsumers(10);
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    return factory;
  }
}

4、系统B中创建队列监听,当队列有消息时,发送websocket通知。

RabbitMQListener.java

@Component
public class RabbitMQListener {

  @Autowired
  private RabbitMQService mqService;

  /**
   * WebSocket推送监听器
   * @param socketEntity
   * @param deliveryTag
   * @param channel
   */
  @RabbitListener(queues = "websocket_msg_queue")
  public void webSocketMsgListener(@Payload WebSocketMsgEntity socketMsgEntity, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {
    mqService.handleWebSocketMsg(socketMsgEntity, deliveryTag, channel);
  }

}

RabbitMQService.java

public class RabbitMQService {
  @Autowired
  private MessageWebSocketHandler messageWebSocketHandler;

  /**
   * @param socketMsgEntity
   * @param deliveryTag
   * @param channel
   * @throws IOException
   */
  void handleWebSocketMsg(WebSocketMsgEntity socketMsgEntity, long deliveryTag, Channel channel) throws IOException {
    try {
      messageWebSocketHandler.sendMessageToUsers(socketMsgEntity.toJsonString(), socketMsgEntity.getToUserIds());
      channel.basicAck(deliveryTag, false);
    } catch (Exception e) {
      channel.basicNack(deliveryTag, false, false);
    }
  }
}

WebSocketMsgEntity为MQ中传送的实体。

public class WebSocketMsgEntity implements Serializable {
  public enum OrderType{
    repair("维修"),
    maintain("保养"),
    measure("计量");

    OrderType(String value){
      this.value = value;
    }
    String value;

    public String getValue() {
      return value;
    }
  }
  //设备名称
  private String EquName;
  //设备编号
  private String EquId;
  //工单类型
  private OrderType orderType;
  //工单单号
  private String orderId;
  //工单状态
  private String orderStatus;
  //创建时间
  private Date createTime;
  //消息接收人ID
  private List<String> toUserIds;

  public String getEquName() {
    return EquName;
  }

  public void setEquName(String equName) {
    EquName = equName;
  }

  public String getOrderId() {
    return orderId;
  }

  public void setOrderId(String orderId) {
    this.orderId = orderId;
  }

  public String getEquId() {
    return EquId;
  }

  public void setEquId(String equId) {
    EquId = equId;
  }

  public String getOrderStatus() {
    return orderStatus;
  }

  public void setOrderStatus(String orderStatus) {
    this.orderStatus = orderStatus;
  }

  public OrderType getOrderType() {
    return orderType;
  }

  public void setOrderType(OrderType orderType) {
    this.orderType = orderType;
  }

  public Date getCreateTime() {
    return createTime;
  }

  public void setCreateTime(Date createTime) {
    this.createTime = createTime;
  }

  public List<String> getToUserIds() {
    return toUserIds;
  }

  public void setToUserIds(List<String> toUserIds) {
    this.toUserIds = toUserIds;
  }

  public String toJsonString(){
    return JSON.toJSONString(this);
  }
}

SpringAOP

1、系统A中创建一个切面类DataInterceptor.java

@Aspect
@Component
public class DataInterceptor {
  @Autowired
  private MessageQueueService queueService;

  //维修工单切点
  @Pointcut("execution(* com.zhishang.hes.common.service.impl.RepairServiceImpl.executeFlow(..))")
  private void repairMsg() {
  }

  /**
   * 返回通知,方法执行正常返回时触发
   *
   * @param joinPoint
   * @param result
   */
  @AfterReturning(value = "repairMsg()", returning = "result")
  public void afterReturning(JoinPoint joinPoint, Object result) {
    //此处可以获得切点方法名
    //String methodName = joinPoint.getSignature().getName();
    EquipmentRepair equipmentRepair = (EquipmentRepair) result;
    WebSocketMsgEntity webSocketMsgEntity = this.generateRepairMsgEntity(equipmentRepair);
    if (webSocketMsgEntity == null) {
      return;
    }
    queueService.send(webSocketMsgEntity);
  }

  /**
   * 生成发送到MQ的维修消息
   *
   * @param equipmentRepair
   * @return
   */
  private WebSocketMsgEntity generateRepairMsgEntity(EquipmentRepair equipmentRepair) {
    WebSocketMsgEntity webSocketMsgEntity = generateRepairMsgFromTasks(equipmentRepair);
    return webSocketMsgEntity;
  }

  /**
   * 从任务中生成消息
   *
   * @param equipmentRepair
   * @return
   */
  private WebSocketMsgEntity generateRepairMsgFromTasks(EquipmentRepair equipmentRepair) {
    //业务代码略
  }

}

2、发送消息到MQ。这里只贴了发送的核心代码

public class RabbitMessageQueue extends AbstractMessageQueue {

  @Value("${rabbitmq.websocket.msg.queue}")
  private String webSocketMsgQueue;

  @Autowired
  private RabbitTemplate rabbitTemplate;

  @Override
  public void send(WebSocketMsgEntity entity) {
    //没有指定exchange,则使用默认名为“”的exchange,binding名与queue名相同
    rabbitTemplate.convertAndSend(webSocketMsgQueue, entity);
  }
}

WebSocket

1、 系统B中引入websocket服务端dependency

<dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-websocket</artifactId>
  <version>4.3.10.RELEASE</version>
</dependency>

2、 配置websocket,添加处理类

WebSocketConfigurer.java

@Configuration
@EnableWebSocket
public class WebSocketConfig extends WebMvcConfigurerAdapter implements WebSocketConfigurer {

  private static Logger logger = LoggerFactory.getLogger(WebSocketConfig.class);

  @Override
  public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
    //配置webSocket路径
    registry.addHandler(messageWebSocketHandler(),"/msg-websocket").addInterceptors(new MyHandshakeInterceptor()).setAllowedOrigins("*");
    //配置webSocket路径 支持前端使用socketJs
    registry.addHandler(messageWebSocketHandler(), "/sockjs/msg-websocket").setAllowedOrigins("*").addInterceptors(new MyHandshakeInterceptor()).withSockJS();
  }

  @Bean
  public MessageWebSocketHandler messageWebSocketHandler() {
    logger.info("......创建MessageWebSocketHandler......");
    return new MessageWebSocketHandler();
  }

}

MessageWebSocketHandler.java 主要用于websocket连接及消息发送处理。配置中还使用了连接握手时的处理,主要是取用户登陆信息,这里不多讲述。

public class MessageWebSocketHandler extends TextWebSocketHandler {
  private static Logger logger = LoggerFactory.getLogger(SystemWebSocketHandler.class);
  private static ConcurrentHashMap<String, CopyOnWriteArraySet<WebSocketSession>> users = new ConcurrentHashMap<>();

  @Override
  public void afterConnectionEstablished(WebSocketSession session) throws Exception {
    String userId = session.getAttributes().get("WEBSOCKET_USERID").toString();
    logger.info("......AfterConnectionEstablished......");
    logger.info("session.getId:" + session.getId());
    logger.info("session.getLocalAddress:" + session.getLocalAddress().toString());
    logger.info("userId:" + userId);
    //websocket连接后记录连接信息
    if (users.keySet().contains(userId)) {
      CopyOnWriteArraySet<WebSocketSession> webSocketSessions = users.get(userId);
      webSocketSessions.add(session);
    } else {
      CopyOnWriteArraySet<WebSocketSession> webSocketSessions = new CopyOnWriteArraySet<>();
      webSocketSessions.add(session);
      users.put(userId, webSocketSessions);
    }
  }

  @Override
  public void handleTransportError(WebSocketSession session, Throwable throwable) throws Exception {
    removeUserSession(session);
    if (session.isOpen()) {
      session.close();
    }
    logger.info("异常出现handleTransportError" + throwable.getMessage());
  }

  @Override
  public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
    removeUserSession(session);
    logger.info("关闭afterConnectionClosed" + closeStatus.getReason());
  }

  @Override
  public boolean supportsPartialMessages() {
    return false;
  }

  /**
   * 给符合要求的在线用户发送消息
   *
   * @param message
   */
  public void sendMessageToUsers(String message, List<String> userIds) throws IOException{
    if (StringUtils.isEmpty(message) || CollectionUtils.isEmpty(userIds)) {
      return;
    }
    if (users.isEmpty()) {
      return;
    }
    for (String userId : userIds) {
      if (!users.keySet().contains(userId)) {
        continue;
      }
      CopyOnWriteArraySet<WebSocketSession> webSocketSessions = users.get(userId);
      if (webSocketSessions == null) {
        continue;
      }
      for (WebSocketSession webSocketSession : webSocketSessions) {
        if (webSocketSession.isOpen()) {
          try {
            webSocketSession.sendMessage(new TextMessage(message));
          } catch (IOException e) {
            logger.error(" WebSocket server send message ERROR " + e.getMessage());
            try {
              throw e;
            } catch (IOException e1) {
              e1.printStackTrace();
            }
          }
        }
      }
    }
  }

  /**
   * websocket清除连接信息
   *
   * @param session
   */
  private void removeUserSession(WebSocketSession session) {
    String userId = session.getAttributes().get("WEBSOCKET_USERID").toString();
    if (users.keySet().contains(userId)) {
      CopyOnWriteArraySet<WebSocketSession> webSocketSessions = users.get(userId);
      webSocketSessions.remove(session);
      if (webSocketSessions.isEmpty()) {
        users.remove(userId);
      }
    }
  }
}

整个功能完成后,A系统分配任务时,系统B登陆用户收到的消息如图:

总体流程:

1、对于系统B,每个登陆的用户都会和服务器建立websocket长连接。

2、系统A生成任务,AOP做出响应,将封装的消息发送给MQ。

3、系统B中的MQ监听发现队列有消息到达,消费消息。

4、系统B通过websocket长连接将消息发给指定的登陆用户。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Spring Boot之AOP配自定义注解的最佳实践过程

    前言 AOP(Aspect Oriented Programming),即面向切面编程,是Spring框架的大杀器之一. 首先,我声明下,我不是来系统介绍什么是AOP,更不是照本宣科讲解什么是连接点.切面.通知和切入点这些让人头皮发麻的概念. 今天就来说说AOP的一些应用场景以及如何通过和其他特性的结合提升自己的灵活性.下面话不多说了,来一起看看详细的介绍吧 AOP应用举例 AOP的一大好处就是解耦.通过切面,我们可以将那些反复出现的代码抽取出来,放在一个地方统一处理. 同时,抽出来的代码很多是

  • Spring AOP中的JDK和CGLib动态代理哪个效率更高?

    一.背景 今天有小伙伴面试的时候被问到:Spring AOP中JDK 和 CGLib动态代理哪个效率更高? 二.基本概念 首先,我们知道Spring AOP的底层实现有两种方式:一种是JDK动态代理,另一种是CGLib的方式. 自Java 1.3以后,Java提供了动态代理技术,允许开发者在运行期创建接口的代理实例,后来这项技术被用到了Spring的很多地方. JDK动态代理主要涉及java.lang.reflect包下边的两个类:Proxy和InvocationHandler.其中,Invoc

  • 在AOP中Spring生成代理类的两种方式

    Java 动态代理.具体有如下四步骤: 通过实现 InvocationHandler 接口创建自己的调用处理器: 通过为 Proxy 类指定 ClassLoader 对象和一组 interface 来创建动态代理类: 通过反射机制获得动态代理类的构造函数,其唯一参数类型是调用处理器接口类型: 通过构造函数创建动态代理类实例,构造时调用处理器对象作为参数被传入. 在AOP中,Spring通过生成代理类,来完成切面的织入. Spring生成代理类有2种方式. 如果目标对象实现的是一个接口,Sprin

  • springboot注册bean的三种方法

    spring在启动时会自己把bean(java组件)注册到ioc容器里,实现控制反转,在开发人员使用spring开发应用程序时,你是看不到new关键字的,所有对象都应该从容器里获得,它们的 生命周期 在放入容器时已经确定! 下面说一下三种注册bean的方法 @ComponentScan @Bean @Import @ComponentScan注册指定包里的bean Spring容器会扫描@ComponentScan配置的包路径,找到标记@Component注解的类加入到Spring容器. 我们经

  • SpringBoot AOP使用笔记

    1. 启用AOP a. 在类上添加@Aspect注解 b. 注入该类, 可以使用@Component进行注入到Spring容器中 2. 通过PointCut对象创建切入点 a. 在某个方法使用类似下面的方法进行注入 @Pointcut("execution(* com.sguess.service.IAOPService.*(..))") private void pointcut() { } i. 其中,execution表达式为 execution(modifiers-patter

  • Spring的AOP极简入门

    AOP是Spring中的面向切面的编程,这里简单感受一下如何在xml文件中配置一个切面. 如上图所示,配置一个切面的主要思路有以下几个步骤. 1,首先需要把实现切面功能的类声明为一个bean,例如图中的minstrel. 2,前面的配置都在<aop:config>标签下进行.一个切面对应一个<aop:aspect>标签,标签的ref可以指定实现该切面的bean是哪一个. 3,然后定义切入点,使用标签<aop:pointcut>,切入点通过expression来匹配到需要

  • Spring中IOC和AOP的深入讲解

    前言 Spring是一个开源框架,Spring是于2003 年兴起的一个轻量级的Java 开发框架,由Rod Johnson 在其著作Expert One-On-One J2EE Development and Design中阐述的部分理念和原型衍生而来.它是为了解决企业应用开发的复杂性而创建的.Spring使用基本的JavaBean来完成以前只可能由EJB完成的事情.然而,Spring的用途不仅限于服务器端的开发.从简单性.可测试性和松耦合的角度而言,任何Java应用都可以从Spring中受益

  • SpringBoot AOP控制Redis自动缓存和更新的示例

    导入redis的jar包 <!-- redis --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>2.0.4.RELEASE</version> </dependency> 编写自定义缓存注解 /**

  • 深入理解Spring事务的传播行为

    前言 本文主要介绍下Spring事务中的传播行为.事务传播行为是Spring框架独有的事务增强特性,他不属于的事务实际提供方数据库行为.这是Spring为我们提供的强大的工具箱,使用事务传播行可以为我们的开发工作提供许多便利. 下面话不多说了,来一起看看详细的介绍吧 事务传播行为介绍 Spring中的7个事务传播行为: |事务行为|说明 | |:--|:--| |PROPAGATION_REQUIRED | 支持当前事务,假设当前没有事务.就新建一个事务 | | PROPAGATION_SUPP

  • SpringBoot实战之SSL配置详解

    1.SSL介绍和说明 SSL的配置也是我们在实际应用中经常遇到的场景 SSL(Secure Sockets Layer,安全套接层)是为网络通信提供安全及数据完整性的一种协议,SSL在网络传输层对网络连接进行加密.SSL协议位于TCP/IP协议与各种应用层协议之间,为数据通信提供安全支持.SSL协议可以分为两层:SSL记录协议(SSL Record Protocal),它建立在可靠的传输协议(如TCP)之上,为高层协议提供数据封装.压缩.加密等基础功能的支持.SSL握手协议(SSL Handsh

随机推荐