Spring Cloud Stream 高级特性使用详解

目录
  • 重试
  • 消息发送失败的处理
  • 消费错误处理
  • 自定义MessageHandler类型
  • Endpoint端点
  • Metrics指标
  • Serverless
  • Partition统一
  • Polling Consumer
  • 支持多个Binder同时使用
  • 建立事件机制

重试

Consumer端可以配置重试次数,当消息消费失败的时候会进行重试。

底层使用Spring Retry去重试,重试次数可自定义配置。

# 默认重试次数为3,配置大于1时才会生效
spring.cloud.stream.bindings.<channelName>.consumer.maxAttempte=3

消息发送失败的处理

Producer发送消息出错的情况下,可以配置错误处理,将错误信息发送给对应ID的MessageChannel

  • 消息发送失败的场景下,会将消息发送到一个MessageChannel。这个MessageChannel会取ApplicationContext中name为topic.errorstopic就是配置的destination)的Bean。
  • 如果找不到就会自动构建一个PublishSubscribeChannel
  • 然后使用BridgeHandler订阅这个MessageChannel,同时再设置ApplicationContext中name为errorChannelPublishSubscribeChannel消息通道为BridgeHandleroutputChannel
    public static final String ERROR_CHANNEL_BEAN_NAME = "errorChannel"
    private SubscribableChannel registerErrorInfrastructure(
            ProducerDestination destination) {
        // destination.getName() + ".errors"
        String errorChannelName = errorsBaseName(destination);
        SubscribableChannel errorChannel;
        if (getApplicationContext().containsBean(errorChannelName)) {
            Object errorChannelObject = getApplicationContext().getBean(errorChannelName);
            if (!(errorChannelObject instanceof SubscribableChannel)) {
                throw new IllegalStateException("Error channel '" + errorChannelName
                        + "' must be a SubscribableChannel");
            }
            errorChannel = (SubscribableChannel) errorChannelObject;
        }
        else {
            errorChannel = new PublishSubscribeChannel();
            ((GenericApplicationContext) getApplicationContext()).registerBean(
                    errorChannelName, SubscribableChannel.class, () -> errorChannel);
        }
        MessageChannel defaultErrorChannel = null;
        if (getApplicationContext()
                .containsBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)) {
            defaultErrorChannel = getApplicationContext().getBean(
                    IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME,
                    MessageChannel.class);
        }
        if (defaultErrorChannel != null) {
            BridgeHandler errorBridge = new BridgeHandler();
            errorBridge.setOutputChannel(defaultErrorChannel);
            errorChannel.subscribe(errorBridge);
            String errorBridgeHandlerName = getErrorBridgeName(destination);
            ((GenericApplicationContext) getApplicationContext()).registerBean(
                    errorBridgeHandlerName, BridgeHandler.class, () -> errorBridge);
        }
        return errorChannel;
    }
  • 示例代码
spring.cloud.stream.bindings.output.destination=test-output
# 消息发送失败的处理逻辑默认是关闭的
spring.cloud.stream.bindings.output.producer.errorChannelEnabled=true
    @Bean("test-output.errors")
    MessageChannel testOutputErrorChannel() {
        return new PublishSubscribeChannel();
    }
    @Service
    class ErrorProduceService {
        @ServiceActivator(inputChannel = "test-output.errors")
        public void receiveProduceError(Message receiveMsg) {
            System.out.println("receive error msg: " + receiveMsg);
        }
    }

消费错误处理

Consumer消费消息出错的情况下,可以配置错误处理,将错误信息发给对应ID的MessageChannel

消息错误处理与生产错误处理大致相同。错误的MessageChannel对应的name为topic.group.errors,还会加上多个MessageHandler订阅的一些判断,使用ErrorMessageStrategy创建错误消息等内容。

  • 示例代码
spring.cloud.stream.bindings.input.destination=test-input
spring.cloud.stream.bindings.input.group=test-input-group
@StreamListener(Sink.INPUT)
public void receive(String receiveMsg) {
    throw new RuntimeException("Oops");
}
@ServiceActivator(inputChannel = "test-input.test-input-group.errors")
public void receiveConsumeError(Message receiveMsg) {
    System.out.println("receive error msg: " + receiveMsg);
}

建议直接使用topic.group.errors这个消息通道,并设置发送到单播模式的DirectChannel消息通道中(使用@ServiceActivator注解接收会直接构成DirectChannel),这样会确保只会被唯一的一个订阅了topic.group.errorsMessageHandler处理,否则可能会被多个MessageHandler处理,导致出现一些意想不到的结果。

自定义MessageHandler类型

默认情况下,Output Binding对应的MessageChannelInput Binding对应的SubscribeChannel会被构造成DirectChannel

SCS提供了BindingTargetFactory接口进行扩展,比如可以扩展构造PublishSubscribeChannel这种广播类型的MessageChannel

BindingTargetFactory接口只有两个实现类

  • SubscribableChannelBindingTargetFactory:针对Input BindingOutput Binding都会构造成DirectWithAttributesChannel类型的MessageChannel(一种带有HashMap属性的DirectChannel)。
  • MessageSourceBindingTargetFactory:不支持Output BindingInput Binding会构造成DefaultPollableMessageSourceDefaultPollableMessageSource内部维护着MessageSource属性,该属性用于拉取消息。

Endpoint端点

SCS提供了BindingsEndpoint,可以获取Binding信息或对Binding生命周期进行修改,比如startstoppauseresume

BindingsEndpoint的ID是bindings,对外暴露了一下3个操作:

  • 修改Binding状态,可以改成STARTEDSTOPPEDPAUSEDRESUMED,对应Binding接口的4个操作。
  • 查询单个Binding的状态信息。
  • 查询所有Binding的状态信息。
@Endpoint(id = "bindings")
public class BindingsEndpoint {
  ...
  @WriteOperation
    public void changeState(@Selector String name, State state) {
        Binding<?> binding = BindingsEndpoint.this.locateBinding(name);
        if (binding != null) {
            switch (state) {
            case STARTED:
                binding.start();
                break;
            case STOPPED:
                binding.stop();
                break;
            case PAUSED:
                binding.pause();
                break;
            case RESUMED:
                binding.resume();
                break;
            default:
                break;
            }
        }
    }
    @ReadOperation
    public List<?> queryStates() {
        List<Binding<?>> bindings = new ArrayList<>(gatherInputBindings());
        bindings.addAll(gatherOutputBindings());
        return this.objectMapper.convertValue(bindings, List.class);
    }
    @ReadOperation
    public Binding<?> queryState(@Selector String name) {
        Assert.notNull(name, "'name' must not be null");
        return this.locateBinding(name);
    }
  ...
}

Metrics指标

该功能自动与micrometer集成进行Metrics统计,可以通过前缀spring.cloud.stream.metrics进行相关配置,配置项spring.cloud.stream.bindings.applicationMetrics.destination会构造MetersPublisherBinding,将相关的metrics发送到MQ中。

Serverless

默认与Spring Cloud Function集成。

可以使用Function处理消息。配置文件需要加上function配置。

spring.cloud.stream.function.definition=uppercase | addprefix

  @Bean
  public Function<String, String> uppercase() {
      return x -> x.toUpperCase();
  }
  @Bean
  public Function<String, String> addprefix() {
      return x -> "prefix-" + x;
  }

Partition统一

SCS统一Partition相关的设置,可以屏蔽不同MQ Partition的设置。

Producer Binding提供的ProducerProperties提供了一些Partition相关的配置:

  • partitionKeyExpression:partition key提取表达式。
  • partitionKeyExtractorName:是一个实现PartitionKeyExtractorStrategy接口的Bean name。PartitionKeyExtractorStrategy是一个根据Message获取partition key的接口。如果两者都配置,优先级高于partitionKeyExtractorName
  • partitionSelectorName:是一个实现PartitionSelectorStrategy接口的Bean name。PartitionSelectorStrategy是一个根据partition key决定选择哪个partition 的接口。
  • partitionSelectorExpression:partition 选择表达式,会根据表达式和partition key得到最终的partition。如果两者都配置,优先partitionSelectorExpression表达式解析partition。
  • partitionCount:partition 个数。该属性不一定会生效,Kafka Binder 和RocketMQ Binder会使用topic上的partition 个数覆盖该属性。
public final class PartitioningInterceptor implements ChannelInterceptor {
      ...
      @Override
      public Message<?> preSend(Message<?> message, MessageChannel channel) {
      if (!message.getHeaders().containsKey(BinderHeaders.PARTITION_OVERRIDE)) {
        int partition = this.partitionHandler.determinePartition(message);
        return MessageConverterConfigurer.this.messageBuilderFactory
          .fromMessage(message)
          .setHeader(BinderHeaders.PARTITION_HEADER, partition).build();
      }
      else {
        return MessageConverterConfigurer.this.messageBuilderFactory
          .fromMessage(message)
          .setHeader(BinderHeaders.PARTITION_HEADER,
                     message.getHeaders()
                     .get(BinderHeaders.PARTITION_OVERRIDE))
          .removeHeader(BinderHeaders.PARTITION_OVERRIDE).build();
      }
    }
}
public class PartitionHandler {
      ...
      public int determinePartition(Message<?> message) {
        Object key = extractKey(message);
        int partition;
        if (this.producerProperties.getPartitionSelectorExpression() != null) {
            partition = this.producerProperties.getPartitionSelectorExpression()
                    .getValue(this.evaluationContext, key, Integer.class);
        }
        else {
            partition = this.partitionSelectorStrategy.selectPartition(key,
                    this.partitionCount);
        }
        // protection in case a user selector returns a negative.
        return Math.abs(partition % this.partitionCount);
    }
    private Object extractKey(Message<?> message) {
        Object key = invokeKeyExtractor(message);
        if (key == null && this.producerProperties.getPartitionKeyExpression() != null) {
            key = this.producerProperties.getPartitionKeyExpression()
                    .getValue(this.evaluationContext, message);
        }
        Assert.notNull(key, "Partition key cannot be null");
        return key;
    }
      ...
}

Polling Consumer

实现MessageSource进行polling操作的Consumer

普通的Pub/Sub模式需要定义SubscribeableChannel类型的返回值,Polling Consumer需要定义PollableMessageSource类型的返回值。

public interface PollableSink {
    /**
     * Input channel name.
     */
    String INPUT = "input";
    /**
     * @return input channel.
     */
    @Input(Sink.INPUT)
    PollableMessageSource input();
}

支持多个Binder同时使用

支持多个Binder同时使用,在配置Binding的时候需要指定对应的Binder

配置全局默认的Binderspring.cloud.stream.default-binder=rocketmq

配置各个Binder内部的配置信息:

spring.cloud.stream.binders.rocketmq.environment.<xx>=xx

spring.cloud.stream.binders.rocketmq.type=rocketmq

配置Binding对应的Binder

spring.cloud.stream.bindings.<channelName>.binder=kafka

spring.cloud.stream.bindings.<channelName>.binder=rocketmq

spring.cloud.stream.bindings.<channelName>.binder=rabbit

建立事件机制

比如,新建BindingCreateEvent事件,用户的应用就可以监听该事件在创建Input BindingOutput Binding 时做业务相关的处理。

以上就是Spring Cloud Stream 高级特性使用详解的详细内容,更多关于Spring Cloud Stream 高级特性的资料请关注我们其它相关文章!

(0)

相关推荐

  • SpringCloud Stream 整合RabbitMQ的基本步骤

    目录 一.项目介绍 二.生产者 三.消费者 四.验证 在postman 访问生产者接口: 本篇简单介绍SpringCloud Stream 整合RabbitMQ基本步骤: 引入SpringCloud 引入SpringCloud Stream相关依赖 定义绑定接口: 消息生产者(Output…Binding) .消息消费者(Input…Binding) @EnableBinding 在对应类上进行定义 @StreamListener 在对应方法上创建监听用来消费消息 调用output的send()

  • Springcloud整合stream,rabbitmq实现消息驱动功能

    springcloud整合stream,rabbitmq实现消息驱动功能 1.代码实现: 创建项目stream 添加依赖 <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.6.2</version> <relativePath/>

  • SpringCloud Stream消息驱动实例详解

    1. 消息驱动概述 1.1 是什么 在实际应用中有很多消息中间件,比如现在企业里常用的有ActiveMQ.RabbitMQ.RocketMQ.Kafka等,学习所有这些消息中间件无疑需要大量时间经历成本,那有没有一种技术,使我们不再需要关注具体的消息中间件的细节,而只需要用一种适配绑定的方式,自动的在各种消息中间件内切换呢?消息驱动就是这样的技术,它能 屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型. SpringCloud Stream是一个构件消息驱动微服务的框架.应用程序通过i

  • Spring Cloud Stream简单用法

    目录 简单使用Spring Cloud Stream 构建基于RocketMQ的生产者和消费者 生产者 消费者 Stream其他特性 消息发送失败的处理 消费者错误处理 Spring Cloud Stream对Spring Cloud体系中的Mq进⾏了很好的上层抽象,可以让我们与具体消息中间件解耦合,屏蔽掉了底层具体MQ消息中间件的细节差异,就像Hibernate屏蔽掉了具体数据库(Mysql/Oracle⼀样).如此⼀来,我们学习.开发.维护MQ都会变得轻松.⽬前Spring Cloud St

  • spring-cloud-stream结合kafka使用详解

    1.pom文件导入依赖 <!-- kafka --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency> 2.application.yml文件配置 spring: cloud: stream: kafka: bind

  • SpringCloud Stream使用解析

    SpringCloudStream 官方定义Spring Cloud Stream 是一个构建消息驱动微服务的框架. 应用通过inputs和outputs来与Spring Cloud Stream中binder对象交互.通过我们配置来binding(绑定),而Spring Cloud Stream中的binder对象负责与消息中间件交互.所以,我们只需要搞清楚如何与Spring Cloud Stream 交互就可以方便使用消息驱动的方式. 通过使用Spring Integration来连接消息代

  • Spring Cloud Stream 高级特性使用详解

    目录 重试 消息发送失败的处理 消费错误处理 自定义MessageHandler类型 Endpoint端点 Metrics指标 Serverless Partition统一 Polling Consumer 支持多个Binder同时使用 建立事件机制 重试 Consumer端可以配置重试次数,当消息消费失败的时候会进行重试. 底层使用Spring Retry去重试,重试次数可自定义配置. # 默认重试次数为3,配置大于1时才会生效 spring.cloud.stream.bindings.<ch

  • Spring Cloud Feign高级应用实例详解

    这篇文章主要介绍了Spring Cloud Feign高级应用实例详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 1.使用feign进行服务间的调用 Spring boot2X Consul如何使用Feign实现服务调用 2.开启gzip压缩 Feign支持对请求与响应的压缩,以提高通信效率,需要在服务消费者配置文件开启压缩支持和压缩文件的类型 添加配置 feign.compression.request.enabled=true feig

  • Spring Cloud Alibaba 之 Nacos教程详解

    Nacos 技术讲解 一提到分布式系统就不的不提一下 CAP 原则 Nacos简介 Nacos是阿里的一个开源产品,它是针对微服务架构中的服务发现.配置管理.服务治理的综合性解决方案. 官方介绍是这样的: Nacos致力于帮助您发现.配置和管理微服务.Nacos提供了一组简单易用的特性集,帮助您实现动态服务发现.服务配置管理.服务及流量管理.Nacos帮助您更敏捷和容易地构建.交付和管理微服务平台.Nacos是构建以"服务"为中心的现代应用架构的服务基础设施. 什么是CAP CAP原则

  • Spring Cloud Zuul的重试配置详解

    Spring Cloud Zuul模块本身就包含了对于hystrix和ribbon的依赖,当我们使用zuul通过path和serviceId的组合来配置路由的时候,可以通过hystrix和ribbon的配置调整路由请求的各种时间超时机制. 1 ribbon配置举例 配置连接超时时间1秒,请求处理时间2秒,统一服务server尝试重连1次,切换server重连1次 ribbon: ConnectTimeout: 1000 ReadTimeout: 2000 MaxAutoRetries: 1 Ma

  • Spring Cloud Gateway使用Token验证详解

    引入依赖 <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <ty

  • Spring Cloud 动态刷新配置信息教程详解

    有时候在配置中心有些参数是需要修改的,这时候如何不重启而达到实时生效的效果呢? 添加依赖 <dependencies> ... <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> ... </dependencies>

  • Vue.js组件高级特性实例详解

    本文实例讲述了Vue.js组件高级特性.分享给大家供大家参考,具体如下: 1 递归 为组件设置 name 属性,这个组件就可以在自身的模板内递归调用自己. html: <div id="app"> <deniro-component :count="1"></deniro-component> </div> js: Vue.component('deniro-component',{ name:'deniro-comp

  • 详解Spring Cloud Stream使用延迟消息实现定时任务(RabbitMQ)

    我们在使用一些开源调度系统(比如:elastic-job等)的时候,对于任务的执行时间通常都是有规律性的,可能是每隔半小时执行一次,或者每天凌晨一点执行一次.然而实际业务中还存在另外一种定时任务,它可能需要一些触发条件才开始定时,比如:编写博文时候,设置2小时之后发送.对于这些开始时间不确定的定时任务,我们也可以通过Spring Cloud Stream来很好的处理. 为了实现开始时间不确定的定时任务触发,我们将引入延迟消息的使用.RabbitMQ中提供了关于延迟消息的插件,所以本文就来具体介绍

  • Spring框架学习之Cache抽象详解

    目录 1.简介 cache和buffer 2.缓存抽象 3.spring缓存抽象与多进程 官方文档  8.0 Spring为不同缓存做了一层抽象,这里通过阅读文档以及源码会对使用以及原理做一些学习笔记. 1.简介 从3.1版开始,Spring Framework提供了对现有Spring应用程序透明地添加缓存的支持. 与事务支持类似,缓存抽象允许一致地使用各种缓存解决方案,而对代码的影响最小. 从Spring 4.1开始,通过JSR-107注释和更多自定义选项的支持,缓存抽象得到了显着改进. ca

随机推荐