Spring Boot中使用RSocket的示例代码

1. 概述

RSocket 应用层协议支持 Reactive Streams 语义, 例如:用RSocket作为HTTP的一种替代方案。在本教程中, 我们将看到 RSocket 用在spring boot中,特别是spring boot 如何帮助抽象出更低级别的RSocket API。

2. 依赖

让我们从添加 spring-boot-starter-rsocket 依赖开始:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>

这个依赖会传递性的拉取 RSocket 相关的依赖,比如: rsocket-core 和 rsocket-transport-netty

3.示例的应用程序

现在继续我们的简单应用程序。为了突出 RSocket 提供的交互模式,我打算创建一个交易应用程序, 交易应用程序包括客户端和服务器。

3.1. 服务器设置

首先,我们设置由springboot应用程序引导的 RSocket server 服务器。 因为有 spring-boot-starter-rsocket dependency 依赖,所以springboot会自动配置 RSocket server 。 跟平常一样, 可以用属性驱动的方式修改 RSocket server 默认配置值。例如:通过增加如下配置在 application.properties 中,来修改 RSocket 端口

spring.rsocket.server.port=7000

也可以根据需要进一步修改服务器的其他属性

3.2.设置客户端

接下来,我们来设置客户端,也是一个springboot应用程序。虽然springboot自动配置大部分RSocket相关的组件,但还要自定义一些对象来完成设置。

@Configuration
public class ClientConfiguration {

  @Bean
  public RSocket rSocket() {
    return RSocketFactory
     .connect()
     .mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE)
     .frameDecoder(PayloadDecoder.ZERO_COPY)
     .transport(TcpClientTransport.create(7000))
     .start()
     .block();
  }

  @Bean
  RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
    return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
  }
}

这儿我们正在创建 RSocket 客户端并且配置TCP端口为:7000。注意: 该服务端口我们在前面已经配置过。 接下来我们定义了一个RSocket的装饰器对象 RSocketRequester 。 这个对象在我们跟 RSocket server 交互时会为我们提供帮助。 定义这些对象配置后,我们还只是有了一个骨架。在接下来,我们将暴露不同的交互模式, 并看看springboot在这个地方提供帮助的。

4. springboot RSocket 中的 Request/Response

我们从 Request/Response 开始, HTTP 也使用这种通信方式,这也是最常见的、最相似的交互模式。 在这种交互模式里, 由客户端初始化通信并发送一个请求。之后,服务器端执行操作并返回一个响应给客户端--这时通信完成。 在我们的交易应用程序里, 一个客户端询问一个给定的股票的当前的市场数据。 作为回复,服务器会传递请求的数据。

4.1.服务器

在服务器这边,我们首先应该创建一个 controller 来持有我们的处理器方法。 我们会使用 @MessageMapping 注解来代替像SpringMVC中的 @RequestMapping 或者 @GetMapping 注解

@Controller
public class MarketDataRSocketController {

  private final MarketDataRepository marketDataRepository;

  public MarketDataRSocketController(MarketDataRepository marketDataRepository) {
    this.marketDataRepository = marketDataRepository;
  }

  @MessageMapping("currentMarketData")
  public Mono<MarketData> currentMarketData(MarketDataRequest marketDataRequest) {
    return marketDataRepository.getOne(marketDataRequest.getStock());
  }
}

来研究下我们的控制器。 我们将使用 @Controller 注解来定义一个控制器来处理进入RSocket的请求。 另外,注解 @MessageMapping 让我们定义我们感兴趣的路由和如何响应一个请求。 在这个示例中, 服务器监听路由 currentMarketData , 并响应一个单一的结果 Mono<MarketData> 给客户端。

4.2. 客户端

接下来, 我们的RSocket客户端应该询问一只股票的价格并得到一个单一的响应。 为了初始化请求, 我们该使用 RSocketRequester 类,如下:

@RestController
public class MarketDataRestController {

  private final RSocketRequester rSocketRequester;

  public MarketDataRestController(RSocketRequester rSocketRequester) {
    this.rSocketRequester = rSocketRequester;
  }

  @GetMapping(value = "/current/{stock}")
  public Publisher<MarketData> current(@PathVariable("stock") String stock) {
    return rSocketRequester
     .route("currentMarketData")
     .data(new MarketDataRequest(stock))
     .retrieveMono(MarketData.class);
  }
}

注意:在示例中, RSocket 客户端也是一个 REST 风格的 controller ,以此来访问我们的 RSocket 服务器。因此,我们使用 @RestController 和 @GetMapping 注解来定义我们的请求/响应端点。 在端点方法中, 我们使用的是类 RSocketRequester 并指定了路由。 事实上,这个是服务器端 RSocket 所期望的路由,然后我们传递请求数据。最后,当调用 retrieveMono() 方法时,springboot会帮我们初始化一个请求/响应交互。

5. Spring Boot RSocket 中的 Fire And Forget 模式

接下来我们将查看 Fire And Forget 交互模式。正如名字提示的一样,客户端发送一个请求给服务器,但是不期望服务器的返回响应回来。 在我们的交易程序中, 一些客户端会作为数据资源服务,并且推送市场数据给服务器端。

5.1.服务器端

我们来创建另外一个端点在我们的服务器应用程序中,如下:

@MessageMapping("collectMarketData")
public Mono<Void> collectMarketData(MarketData marketData) {
  marketDataRepository.add(marketData);
  return Mono.empty();
}

我们又一次定义了一个新的 @MessageMapping 路由为 collectMarketData 。此外, Spring Boot自动转换传入的负载为一个 MarketData 实例。 但是,这儿最大的不同是我们返回一个 Mono<Void> ,因为客户端不需要服务器的返回。

5.2. 客户端

来看看我们如何初始化我们的 fire-and-forget 模式的请求。 我们将创建另外一个REST风格的端点,如下:

@GetMapping(value = "/collect")
public Publisher<Void> collect() {
  return rSocketRequester
   .route("collectMarketData")
   .data(getMarketData())
   .send();
}

这儿我们指定路由和负载将是一个 MarketData 实例。 由于我们使用 send() 方法来代替 retrieveMono() ,所有交互模式变成了 fire-and-forget 模式。

6. Spring Boot RSocket 中的 Request Stream

请求流是一种更复杂的交互模式, 这个模式中客户端发送一个请求,但是在一段时间内从服务器端获取到多个响应。 为了模拟这种交互模式, 客户端会询问给定股票的所有市场数据。

6.1.服务器端

我们从服务器端开始。 我们将添加另外一个消息映射方法,如下:

@MessageMapping("feedMarketData")
public Flux<MarketData> feedMarketData(MarketDataRequest marketDataRequest) {
  return marketDataRepository.getAll(marketDataRequest.getStock());
}

正如所见, 这个处理器方法跟其他的处理器方法非常类似。 不同的部分是我们返回一个 Flux<MarketData> 来代替 Mono<MarketData> 。 最后我们的RSocket服务器会返回多个响应给客户端。

6.2.客户端

在客户端这边, 我们该创建一个端点来初始化请求/响应通信,如下:

@GetMapping(value = "/feed/{stock}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Publisher<MarketData> feed(@PathVariable("stock") String stock) {
  return rSocketRequester
   .route("feedMarketData")
   .data(new MarketDataRequest(stock))
   .retrieveFlux(MarketData.class);
}

我们来研究下RSocket请求。 首先我们定义了路由和请求负载。 然后,我们定义了使用 retrieveFlux() 调用的响应期望。这部分决定了交互模式。 另外注意:由于我们的客户端也是 REST 风格的服务器,客户端也定义了响应媒介类型 MediaType.TEXT_EVENT_STREAM_VALUE 。

7.异常的处理

现在让我们看看在服务器程序中,如何以声明式的方式处理异常。 当处理请求/响应式, 我可以简单的使用 @MessageExceptionHandler 注解,如下:

@MessageExceptionHandler
public Mono<MarketData> handleException(Exception e) {
  return Mono.just(MarketData.fromException(e));
}

这里我们给异常处理方法标记注解为 @MessageExceptionHandler 。作为结果, 这个方法将处理所有类型的异常, 因为 Exception 是所有其他类型的异常的超类。 我们也可以明确地创建更多的不同类型的,不同的异常处理方法。 这当然是请求/响应模式,并且我们返回的是 Mono<MarketData> 。我们期望这里的响应类型跟我们的交互模式的返回类型相匹配。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • springboot websocket简单入门示例

    之前做的需求都是客户端请求服务器响应,新需求是服务器主动推送信息到客户端.百度之后有流.长轮询.websoket等方式进行.但是目前更加推崇且合理的显然是websocket. 从springboot官网翻译了一些资料,再加上百度简单实现了springboot使用websocekt与客户端的双工通信. 1.首先搭建一个简单的springboot环境 <!-- Inherit defaults from Spring Boot --> <parent> <groupId>o

  • SpringBoot webSocket实现发送广播、点对点消息和Android接收

    1.SpringBoot webSocket SpringBoot 使用的websocket 协议,不是标准的websocket协议,使用的是名称叫做STOMP的协议. 1.1 STOMP协议说明 STOMP,Streaming Text Orientated Message Protocol,是流文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息的中间件)设计的简单文本协议. 它提供了一个可互操作的连接格式,允许STOMP客户端与任意STOMP消

  • 详解spring boot Websocket使用笔记

    本文只作为个人笔记,大部分代码是引用其他人的文章的. 在springboot项目中使用websocket做推送,虽然挺简单的,但初学也踩过几个坑,特此记录. 使用websocket有两种方式:1是使用sockjs,2是使用h5的标准.使用Html5标准自然更方便简单,所以记录的是配合h5的使用方法. 1.pom 核心是@ServerEndpoint这个注解.这个注解是Javaee标准里的注解,tomcat7以上已经对其进行了实现,如果是用传统方法使用tomcat发布项目,只要在pom文件中引入j

  • 详解spring boot实现websocket

    前言 QQ这类即时通讯工具多数是以桌面应用的方式存在.在没有websocket出现之前,如果开发一个网页版的即时通讯应用,则需要定时刷新页面或定时调用ajax请求,这无疑会加大服务器的负载和增加了客户端的流量.而websocket的出现,则完美的解决了这些问题. spring boot对websocket进行了封装,这对实现一个websocket网页即时通讯应用来说,变得非常简单.  一.准备工作 pom.xml引入 <dependency> <groupId>org.spring

  • 详解在Spring Boot框架下使用WebSocket实现消息推送

    spring Boot的学习持续进行中.前面两篇博客我们介绍了如何使用Spring Boot容器搭建Web项目以及怎样为我们的Project添加HTTPS的支持,在这两篇文章的基础上,我们今天来看看如何在Spring Boot中使用WebSocket. 什么是WebSocket WebSocket为浏览器和服务器之间提供了双工异步通信功能,也就是说我们可以利用浏览器给服务器发送消息,服务器也可以给浏览器发送消息,目前主流浏览器的主流版本对WebSocket的支持都算是比较好的,但是在实际开发中使

  • 使用 Spring Boot 实现 WebSocket实时通信

    在开发 Web 应用程序时,我们有时需要将服务端事件推送到连接的客户端.但 HTTP 并不能做到.客户端打开与服务端的连接并请求数据,但服务端不能打开与客户端的连接并推送数据. 为了解决这个限制,我们可以建立了一个轮询模式,网页会间隔地轮询服务器以获取新事件.但这种模式不太理想,因为它增加了 HTTP 开销,速度也只能达到与轮询的速率一样快,并且给服务器增加了不必要的负载. 幸运的是,HTML5 WebSocket 出现了.WebSocket 协议允许浏览器与 Web 服务器之间进行低开销的交互

  • 关于Spring Boot WebSocket整合以及nginx配置详解

    前言 本文主要给大家介绍了关于Spring Boot WebSocket整合及nginx配置的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧. 一:Spring Boot WebSocket整合 创建一个maven项目,加入如下依赖 <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId>

  • Spring Boot中使用RSocket的示例代码

    1. 概述 RSocket 应用层协议支持 Reactive Streams 语义, 例如:用RSocket作为HTTP的一种替代方案.在本教程中, 我们将看到 RSocket 用在spring boot中,特别是spring boot 如何帮助抽象出更低级别的RSocket API. 2. 依赖 让我们从添加 spring-boot-starter-rsocket 依赖开始: <dependency> <groupId>org.springframework.boot</g

  • Spring Boot中使用RabbitMQ的示例代码

    很久没有写Spring Boot的内容了,正好最近在写Spring Cloud Bus的内容,因为内容会有一些相关性,所以先补一篇关于AMQP的整合. Message Broker与AMQP简介 Message Broker是一种消息验证.传输.路由的架构模式,其设计目标主要应用于下面这些场景: 消息路由到一个或多个目的地 消息转化为其他的表现方式 执行消息的聚集.消息的分解,并将结果发送到他们的目的地,然后重新组合相应返回给消息用户 调用Web服务来检索数据 响应事件或错误 使用发布-订阅模式

  • Spring Boot使用模板freemarker的示例代码

    最近有好久没有更新博客了,感谢小伙伴的默默支持,不知道是谁又打赏了我一个小红包,谢谢. 今天我们讲讲怎么在Spring Boot中使用模板引擎freemarker,先看看今天的大纲: (1) freemarker介绍: (2) 新建spring-boot-freemarker工程: (3) 在pom.xml引入相关依赖: (4) 编写启动类: (5) 编写模板文件hello.ftl; (6) 编写访问类HelloController; (7) 测试: (8) freemarker配置: (9)

  • Spring Boot实现文件上传示例代码

    使用SpringBoot进行文件上传的方法和SpringMVC差不多,本文单独新建一个最简单的DEMO来说明一下. 主要步骤包括: 1.创建一个springboot项目工程,本例名称(demo-uploadfile). 2.配置 pom.xml 依赖. 3.创建和编写文件上传的 Controller(包含单文件上传和多文件上传). 4.创建和编写文件上传的 HTML 测试页面. 5.文件上传相关限制的配置(可选). 6.运行测试. 项目工程截图如下: 文件代码: <dependencies>

  • spring boot实现软删除的示例代码

    本文开发环境:spring-boot:2.0.3.RELEASE + java1.8 WHY TO DO 软删除:即不进行真正的删除操作.由于我们实体间的约束性(外键)的存在,删除某些数据后,将导致其它的数据不完整.比如,计算机1801班的教师是张三,此时,我们如果把张三删除掉,那么在查询计算机1801班时,由于张三不存了,所以就会报EntityNotFound的错误.当然了,在有外键约束的数据库中,如果张三是1801班的教师,那么我们直接删除张三将报一个约束性的异常.也就是说:直接删除张三这个

  • Spring Boot 整合 Apache Dubbo的示例代码

    Apache Dubbo是一款高性能.轻量级的开源 Java RPC 框架,它提供了三大核心能力:面向接口的远程方法调用,智能容错和负载均衡,以及服务自动注册和发现. 注意,是 Apache Dubbo,不再是 Alibaba Dubbo.简单来说就是 Alibaba 将 Dubbo 移交给 Apache 开源社区进行维护.参见 dubbo-spring-boot-project Spring Boot 系列:整合 Alibaba Dubbo 一.本文示例说明 1.1 框架版本Dubbo 版本

  • Spring Boot如何动态创建Bean示例代码

    前言 本文主要给大家介绍了关于Spring Boot动态创建Bean的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧. SpringBoot测试版本:1.3.4.RELEASE 参考代码如下: package com.spring.configuration; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.su

  • Spring boot集成Kafka+Storm的示例代码

    前言 由于业务需求需要把Strom与kafka整合到spring boot项目里,实现其他服务输出日志至kafka订阅话题,storm实时处理该话题完成数据监控及其他数据统计,但是网上教程较少,今天想写的就是如何整合storm+kafka 到spring boot,顺带说一说我遇到的坑. 使用工具及环境配置 ​ 1. java 版本jdk-1.8 ​ 2. 编译工具使用IDEA-2017 ​ 3. maven作为项目管理 ​ 4.spring boot-1.5.8.RELEASE 需求体现 1.

  • spring boot整合mybatis+mybatis-plus的示例代码

    Spring boot对于我来说是一个刚接触的新东西,学习过程中,发现这东西还是很容易上手的,Spring boot没配置时会默认使用Spring data jpa,这东西可以说一个极简洁的工具,可是我还是比较喜欢用mybatis,工具是没有最好的,只有这合适自己的. 说到mybatis,最近有一个很好用的工具--------mybatis-Plus(官网),现在更新的版本是2.1.2,这里使用的也是这个版本.我比较喜欢的功能是代码生成器,条件构造器,这样就可以更容易的去开发了. mybatis

  • spring boot 与kafka集成的示例代码

    新建spring boot项目 这里使用intellij IDEA 添加kafka集成maven <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLoc

随机推荐