Spring Boot集成Java DSL的实现代码

Spring Integration Java DSL已经融合到Spring Integration Core 5.0,这是一个聪明而明显的举动,因为:

  • 基于Java Config启动新Spring项目的每个人都使用它
  • SI Java DSL使您可以使用Lambdas等新的强大Java 8功能
  • 您可以使用 基于IntegrationFlowBuilderBuilder模式构建流

让我们看看基于ActiveMQ JMS的示例如何使用它。

Maven依赖:

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
  </dependency>

  <dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-core</artifactId>
  </dependency>

  <dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-jms</artifactId>
  </dependency>

  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
  </dependency>

  <dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-kahadb-store</artifactId>
  </dependency>

  <!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-java-dsl -->
  <dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-java-dsl</artifactId>
    <version>1.2.3.RELEASE</version>
  </dependency>
</dependencies>

示例1:Jms入站网关

我们有以下ServiceActivator

@Service
public class ActiveMQEndpoint {
  @ServiceActivator(inputChannel = "inboundChannel")
  public void processMessage(final String inboundPayload) {
    System.out.println("Inbound message: "+inboundPayload);
  }
}

如果您想使用SI Java DSL 将inboundPayload从Jms队列发送到Gateway风格的激活器,那么请使用DSLJms工厂:

@Bean
public DynamicDestinationResolver dynamicDestinationResolver() {
  return new DynamicDestinationResolver();
}

@Bean
public ActiveMQConnectionFactory connectionFactory() {
  return new ActiveMQConnectionFactory();
}

@Bean
public DefaultMessageListenerContainer listenerContainer() {
  final DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
  defaultMessageListenerContainer.setDestinationResolver(dynamicDestinationResolver());
  defaultMessageListenerContainer.setConnectionFactory(connectionFactory());
  defaultMessageListenerContainer.setDestinationName("jms.activeMQ.Test");
  return defaultMessageListenerContainer;
}

@Bean
public MessageChannel inboundChannel() {
  return MessageChannels.direct("inboundChannel").get();
}

@Bean
public JmsInboundGateway dataEndpoint() {
  return Jms.inboundGateway(listenerContainer())
      .requestChannel(inboundChannel()).get();
}

通过dataEndpoint bean 返回JmsInboundGatewaySpec,您还可以向SI通道或Jms目标发送回复。查看文档。

示例2:Jms消息驱动的通道适配器

如果您正在寻找替换消息驱动通道适配器的XML JMS配置,那么JmsMessageDrivenChannelAdapter是一种适合您的方式:

@Bean
public DynamicDestinationResolver dynamicDestinationResolver() {
  return new DynamicDestinationResolver();
}

@Bean
public ActiveMQConnectionFactory connectionFactory() {
  return new ActiveMQConnectionFactory();
}

@Bean
public DefaultMessageListenerContainer listenerContainer() {
  final DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
  defaultMessageListenerContainer.setDestinationResolver(dynamicDestinationResolver());
  defaultMessageListenerContainer.setConnectionFactory(connectionFactory());
  defaultMessageListenerContainer.setDestinationName("jms.activeMQ.Test");
  return defaultMessageListenerContainer;
}

@Bean
public MessageChannel inboundChannel() {
  return MessageChannels.direct("inboundChannel").get();
}

@Bean
public JmsMessageDrivenChannelAdapter dataEndpoint() {
  final ChannelPublishingJmsMessageListener channelPublishingJmsMessageListener =
      new ChannelPublishingJmsMessageListener();
  channelPublishingJmsMessageListener.setExpectReply(false);
  final JmsMessageDrivenChannelAdapter messageDrivenChannelAdapter = new
      JmsMessageDrivenChannelAdapter(listenerContainer(), channelPublishingJmsMessageListener
      );

  messageDrivenChannelAdapter.setOutputChannel(inboundChannel());
  return messageDrivenChannelAdapter;
}

与前面的示例一样,入站有效负载如样本1中一样发送给激活器。

示例3:使用JAXB的Jms消息驱动的通道适配器

在典型的场景中,您希望通过Jms接受XML作为文本消息,将其转换为JAXB存根并在服务激活器中处理它。我将向您展示如何使用SI Java DSL执行此操作,但首先让我们为xml处理添加两个依赖项:

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-xml</artifactId>
  </dependency>

  <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-oxm</artifactId>
  </dependency>

我们将通过JMS接受shiporders ,所以首先XSD命名为shiporder.xsd:

<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">

  <xs:element name="shiporder">
    <xs:complexType>
      <xs:sequence>
        <xs:element name="orderperson" type="xs:string"/>
        <xs:element name="shipto">
          <xs:complexType>
            <xs:sequence>
              <xs:element name="name" type="xs:string"/>
              <xs:element name="address" type="xs:string"/>
              <xs:element name="city" type="xs:string"/>
              <xs:element name="country" type="xs:string"/>
            </xs:sequence>
          </xs:complexType>
        </xs:element>
        <xs:element name="item" maxOccurs="unbounded">
          <xs:complexType>
            <xs:sequence>
              <xs:element name="title" type="xs:string"/>
              <xs:element name="note" type="xs:string" minOccurs="0"/>
              <xs:element name="quantity" type="xs:positiveInteger"/>
              <xs:element name="price" type="xs:decimal"/>
            </xs:sequence>
          </xs:complexType>
        </xs:element>
      </xs:sequence>
      <xs:attribute name="orderid" type="xs:string" use="required"/>
    </xs:complexType>
  </xs:element>

</xs:schema>

新增JAXB maven plugin 生成JAXB存根:

 <plugin>
      <groupId>org.codehaus.mojo</groupId>
      <artifactId>jaxb2-maven-plugin</artifactId>
      <version>2.3.1</version>
      <executions>
        <execution>
          <id>xjc-schema1</id>
          <goals>
            <goal>xjc</goal>
          </goals>
          <configuration>
            <!-- Use all XSDs under the west directory for sources here. -->
            <sources>
              <source>src/main/resources/xsds/shiporder.xsd</source>
            </sources>

            <!-- Package name of the generated sources. -->
            <packageName>com.example.stubs</packageName>
            <outputDirectory>src/main/java</outputDirectory>
            <clearOutputDir>false</clearOutputDir>
          </configuration>
        </execution>
      </executions>
    </plugin>

我们已经准备好了存根类和一切,现在使用Jaxb magic的Java DSL JMS消息驱动适配器:

/**
 * Sample 3: Jms message driven adapter with JAXB
 */
@Bean
public JmsMessageDrivenChannelAdapter dataEndpoint() {
  final ChannelPublishingJmsMessageListener channelPublishingJmsMessageListener =
      new ChannelPublishingJmsMessageListener();
  channelPublishingJmsMessageListener.setExpectReply(false);
  channelPublishingJmsMessageListener.setMessageConverter(new MarshallingMessageConverter(shipOrdersMarshaller()));
  final JmsMessageDrivenChannelAdapter messageDrivenChannelAdapter = new
      JmsMessageDrivenChannelAdapter(listenerContainer(), channelPublishingJmsMessageListener
  );

  messageDrivenChannelAdapter.setOutputChannel(inboundChannel());
  return messageDrivenChannelAdapter;
}

@Bean
public Jaxb2Marshaller shipOrdersMarshaller() {
  Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
  marshaller.setContextPath("com.example.stubs");
  return marshaller;
}

XML配置在Java中使用它可以为您提供如此强大的功能和灵活性。要完成此示例,inboundChannel的服务激活器将如下所示:

/**
 * Sample 3
 * @param shiporder
 */
@ServiceActivator(inputChannel = "inboundChannel")
public void processMessage(final Shiporder shiporder) {
  System.out.println(shiporder.getOrderid());
  System.out.println(shiporder.getOrderperson());
}

要测试流,您可以使用以下XML通过JConsole发送到JMS队列:

 <?xml version="1.0" encoding="UTF-8"?>
  <shiporder orderid="889923"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:noNamespaceSchemaLocation="shiporder.xsd">
   <orderperson>John Smith</orderperson>
    <shipto>
     <name>Ola Nordmann</name>
     <address>Langgt 23</address>
     <city>4000 Stavanger</city>
     <country>Norway</country>
    </shipto>
    <item>
     <title>Empire Burlesque</title>
     <note>Special Edition</note>
     <quantity>1</quantity>
     <price>10.90</price>
     </item>
    <item>
     <title>Hide your heart</title>
     <quantity>1</quantity>
     <price>9.90</price>
    </item>
  </shiporder>

示例4:具有JAXB和有效负载根路由的Jms消息驱动的通道适配器

另一种典型情况是接受XML作为JMS文本消息,将其转换为JAXB存根并根据有效负载根类型将有效负载路由到某个服务激活器。当然SI Java DSL支持所有类型的路由,我将向您展示如何根据有效载荷类型进行路由。

首先,将以下XSD添加到shiporder.xsd所在的文件夹中,并将其命名为purchaseorder.xsd:

<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema"
      xmlns:tns="http://tempuri.org/PurchaseOrderSchema.xsd"
      targetNamespace="http://tempuri.org/PurchaseOrderSchema.xsd"
      elementFormDefault="qualified">
  <xsd:element name="PurchaseOrder">
    <xsd:complexType>
      <xsd:sequence>
        <xsd:element name="ShipTo" type="tns:USAddress" maxOccurs="2"/>
        <xsd:element name="BillTo" type="tns:USAddress"/>
      </xsd:sequence>
      <xsd:attribute name="OrderDate" type="xsd:date"/>
    </xsd:complexType>
  </xsd:element>

  <xsd:complexType name="USAddress">
    <xsd:sequence>
      <xsd:element name="name"  type="xsd:string"/>
      <xsd:element name="street" type="xsd:string"/>
      <xsd:element name="city"  type="xsd:string"/>
      <xsd:element name="state" type="xsd:string"/>
      <xsd:element name="zip"  type="xsd:integer"/>
    </xsd:sequence>
    <xsd:attribute name="country" type="xsd:NMTOKEN" fixed="US"/>
  </xsd:complexType>
</xsd:schema>

然后添加到jaxb maven插件配置:

 <plugin>
      <groupId>org.codehaus.mojo</groupId>
      <artifactId>jaxb2-maven-plugin</artifactId>
      <version>2.3.1</version>
      <executions>
        <execution>
          <id>xjc-schema1</id>
          <goals>
            <goal>xjc</goal>
          </goals>
          <configuration>
            <!-- Use all XSDs under the west directory for sources here. -->
            <sources>
              <source>src/main/resources/xsds/shiporder.xsd</source>
              <source>src/main/resources/xsds/purchaseorder.xsd</source>
            </sources>

            <!-- Package name of the generated sources. -->
            <packageName>com.example.stubs</packageName>
            <outputDirectory>src/main/java</outputDirectory>
            <clearOutputDir>false</clearOutputDir>
          </configuration>
        </execution>
      </executions>
    </plugin>

运行mvn clean install以生成新XSD的JAXB存根。现在承诺有效负载根映射:

@Bean
public Jaxb2Marshaller ordersMarshaller() {
  Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
  marshaller.setContextPath("com.example.stubs");
  return marshaller;
}

/**
 * Sample 4: Jms message driven adapter with Jaxb and Payload routing.
 * @return
 */
@Bean
public JmsMessageDrivenChannelAdapter dataEndpoint() {
  final ChannelPublishingJmsMessageListener channelPublishingJmsMessageListener =
      new ChannelPublishingJmsMessageListener();
  channelPublishingJmsMessageListener.setMessageConverter(new MarshallingMessageConverter(ordersMarshaller()));
  final JmsMessageDrivenChannelAdapter messageDrivenChannelAdapter = new
      JmsMessageDrivenChannelAdapter(listenerContainer(), channelPublishingJmsMessageListener
  );

  messageDrivenChannelAdapter.setOutputChannel(inboundChannel());
  return messageDrivenChannelAdapter;
}

@Bean
public IntegrationFlow payloadRootMapping() {
  return IntegrationFlows.from(inboundChannel()).<Object, Class<?>>route(Object::getClass, m->m
      .subFlowMapping(Shiporder.class, sf->sf.handle((MessageHandler) message -> {
        final Shiporder shiporder = (Shiporder) message.getPayload();
        System.out.println(shiporder.getOrderperson());
        System.out.println(shiporder.getOrderid());
      }))
      .subFlowMapping(PurchaseOrder.class, sf->sf.handle((MessageHandler) message -> {
        final PurchaseOrder purchaseOrderType = (PurchaseOrder) message.getPayload();
        System.out.println(purchaseOrderType.getBillTo().getName());
      }))
  ).get();
}

注意payloadRootMapping bean,让我们解释一下重要的部分:

  • <Object, Class<?>> route - 表示来自inboundChannel的输入将是Object,并且将根据Class <?>执行路由
  • subFlowMapping(Shiporder.class.. - ShipOders的处理。
  • subFlowMapping(PurchaseOrder.class ... - 处理PurchaseOrders。

要测试ShipOrder有效负载,请使用示例3中的XML,以测试PurchaseOrder有效负载,使用以下XML:

<?xml version="1.0" encoding="utf-8"?>
<PurchaseOrder OrderDate="1900-01-01" xmlns="http://tempuri.org/PurchaseOrderSchema.xsd">
 <ShipTo country="US">
  <name>name1</name>
  <street>street1</street>
  <city>city1</city>
  <state>state1</state>
  <zip>1</zip>
 </ShipTo>
 <ShipTo country="US">
  <name>name2</name>
  <street>street2</street>
  <city>city2</city>
  <state>state2</state>
  <zip>-79228162514264337593543950335</zip>
 </ShipTo>
 <BillTo country="US">
  <name>name1</name>
  <street>street1</street>
  <city>city1</city>
  <state>state1</state>
  <zip>1</zip>
 </BillTo>
</PurchaseOrder>

应根据subflow 子流Map路由两个有效载荷。

示例5:IntegrationFlowAdapter

除了企业集成模式的其他实现(check them out)),我需要提到IntegrationFlowAdapter。通过扩展此类并实现buildFlow方法,如:

[url=https://bitbucket.org/Component/]@Component[/url]
public class MyFlowAdapter extends IntegrationFlowAdapter {

@Autowired
 private ConnectionFactory rabbitConnectionFactory;

 @Override
 protected IntegrationFlowDefinition<?> buildFlow() {
   return from(Amqp.inboundAdapter(this.rabbitConnectionFactory, "myQueue"))
        .<String, String>transform(String::toLowerCase)
        .channel(c -> c.queue("myFlowAdapterOutput"));
 }

你可以将bean的重复声明包装成一个组件并给它们所需的流量。然后可以配置这样的组件并将其作为一个类实例提供给调用代码!

因此,让我们举例说明这个repo中的示例3更短一些,并为所有JmsEndpoints定义基类,并在其中定义重复bean:

public class JmsEndpoint extends IntegrationFlowAdapter {

  private String queueName;

  private String channelName;

  private String contextPath;

  /**
   * @param queueName
   * @param channelName
   * @param contextPath
   */
  public JmsEndpoint(String queueName, String channelName, String contextPath) {
    this.queueName = queueName;
    this.channelName = channelName;
    this.contextPath = contextPath;
  }

  @Override
  protected IntegrationFlowDefinition<?> buildFlow() {
    return from(Jms.messageDrivenChannelAdapter(listenerContainer())
      .jmsMessageConverter(new MarshallingMessageConverter(shipOrdersMarshaller()))
    ).channel(channelName);
  }

  @Bean
  public Jaxb2Marshaller shipOrdersMarshaller() {
    Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
    marshaller.setContextPath(contextPath);
    return marshaller;
  }

  @Bean
  public DynamicDestinationResolver dynamicDestinationResolver() {
    return new DynamicDestinationResolver();
  }

  @Bean
  public ActiveMQConnectionFactory connectionFactory() {
    return new ActiveMQConnectionFactory();
  }

  @Bean
  public DefaultMessageListenerContainer listenerContainer() {
    final DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
    defaultMessageListenerContainer.setDestinationResolver(dynamicDestinationResolver());
    defaultMessageListenerContainer.setConnectionFactory(connectionFactory());
    defaultMessageListenerContainer.setDestinationName(queueName);
    return defaultMessageListenerContainer;
  }

  @Bean
  public MessageChannel inboundChannel() {
    return MessageChannels.direct(channelName).get();
  }
}

现在声明特定队列的Jms端点很容易:

@Bean
public JmsEndpoint jmsEndpoint() {
  return new JmsEndpoint("jms.activeMQ.Test", "inboundChannel", "com.example.stubs");
}

inboundChannel的服务激活器:

/**
 * Sample 3, 5
 * @param shiporder
 */
@ServiceActivator(inputChannel = "inboundChannel")
public void processMessage(final Shiporder shiporder) {
  System.out.println(shiporder.getOrderid());
  System.out.println(shiporder.getOrderperson());
}

您不应该错过在项目中使用IntegrationFlowAdapter。我喜欢它的概念。

我最近在Embedit的新的基于Spring Boot的项目中开始使用Spring Integration Java DSL 。即使有一些配置,我发现它非常有用。

  • 它很容易调试。不添加像wiretap这样的配置。
  • 阅读起来要容易得多。是的,即使是lambdas!
  • 它很强大。在Java配置中,您现在有很多选择。

源码地址:https://bitbucket.org/tomask79/spring-integration-java-dsl

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Spring Cloud原理详解

    之前一直在看<Spring Cloud微服务实战>,最近又看了架构笔记的<拜托!面试请不要再问我Spring Cloud底层原理>,对Spring Cloud的主要组件的原理有了更深的理解,特地做一下总结 一.Spring Cloud核心组件:Eureka (1)Netflix Eureka 1).Eureka服务端:也称服务注册中心,同其他服务注册中心一样,支持高可用配置.如果Eureka以集群模式部署,当集群中有分片出现故障时,那么Eureka就转入自我保护模式.它允许在分片故

  • 浅谈Java中Spring Boot的优势

    Spring Boot 是由 Pivotal 团队提供的全新框架,其设计目的是用来简化新 Spring 应用的初始搭建以及开发过程.该框架使用了特定的方式来进行配置,从而使开发人员不再需要定义样板化的配置. Spring Boot 简化了基于 Spring 的应用开发,通过少量的代码就能创建一个独立的.产品级别的 Spring 应用. 作为一名 Java 程序员如果你已经厌恶了传统的开发模式,希望有一个全新的框架可以提供快速开发,简单集成的编程体验,强烈建议你学习了解 Spring Boot .

  • java Springboot实现多文件上传功能

    前端采用layui框架,讲解多文件上传的完整实现功能. 前端html重点代码如下: <div class="layui-form-item"> <label class="layui-form-label">上传文件</label> <div class="layui-input-block"> <div class="layui-upload"> <butto

  • Spring相关知识点的总结与梳理

    1).IOC:控制反转,某一接口具体实现类的选择控制权从调用类中移除,转交给第三方决定,即由Spring容器借由Bean配置来进行控制 2).DI:依赖注入,让调用类对某一接口实现类的依赖关系由第三方(容器或协作类)注入,以移除调用类对某一接口实现类的依赖 3).IOC主要分为3种类型:构造函数注入.属性注入和接口注入.Spring支持构造函数注入和属性注入 4).类装载器ClassLoader 类装载器就是寻找类的字节码文件并构造出类在JVM内部表示对象的组件.在Java中,类装载器把一个类装

  • SpringBoot中关于static和templates的注意事项以及webjars的配置

    1. 默认情况下, 网页存放于static目录下, 默认的"/"指向的是~/resouces/static/index.html文 2. 如果引入了thymeleaf, 则默认指向的地址为~/resouces/templates/index.html <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-thymel

  • JavaTCP上传文本文件代码

    基于聊天客户端的基础上的文件(TXT文件)传输 客户端代码: public class UploadClient { public static void main(String[] args) throws UnknownHostException, IOException { // TODO Auto-generated method stub //1,创建socket客户端对象 Socket s = new Socket("localhost",10005); //2,读取本地文

  • Spring多对象引入方法

    在以前使用xml配置注入的时候, 可以通过name名称注入, 也可以使用type类型注入. 在SpringBoot中, 可以使用@Resource和@Autowried注解进行注入. @Resource 默认会使用名称进行注入,  如果找不到, 会使用自动使用类型进行注入. @Autowried, 则会在容器中寻找匹配的对象, 如果找到则注入成功, 如果没找到或者找到多个, 则会报错. 但是, 如果有多个的情况下, 可以使用@Qualifier("别名") 进行约束. 1.创建Bean

  • SpringBoot之Java配置的实现

    Java配置也是Spring4.0推荐的配置方式,完全可以取代XML的配置方式,也是SpringBoot推荐的方式. Java配置是通过@Configuation和@Bean来实现的: 1.@Configuation注解,说明此类是配置类,相当于Spring的XML方式 2.@Bean注解,注解在方法上,当前方法返回的是一个Bean eg: 此类没有使用@Service等注解方式 package com.wisely.heighlight_spring4.ch1.javaconfig; publ

  • Java五子棋AI实现代码

    思路: ①五子棋界面的实现 ②交互下棋的实现 ③重绘 ④AI,实现人机对战 五子棋和简单AI的实现: 首先将五子棋的界面写出来. 首先我们写一个接口类,定义好棋盘的数据(目的是方便修改). public interface Config { public static final int X0=50;//左上角起点X值 public static final int Y0=50;//左上角起点Y值 public static final int ROWS=15;//横向线数 public sta

  • 详解java中spring里的三大拦截器

    Filter 新建 TimeFilter @Component public class TimeFilter implements Filter { @Override public void init(FilterConfig filterConfig) throws ServletException { System.out.println("time filter init"); } @Override public void doFilter(ServletRequest s

随机推荐