Spring框架有哪些核心模块?
- 内容介绍
- 文章标签
- 相关推荐
本文共计2114个文字,预计阅读时间需要9分钟。
目录+重试+消息发送失败处理+消费错误处理+自定义MessageHandler类型+端点+Metrics指标+Serverless+分区系统+轮询消费者+支持多Binder同时使用+建立事件机制+重试+消费者+端点可配置
目录
- 重试
- 消息发送失败的处理
- 消费错误处理
- 自定义MessageHandler类型
- Endpoint端点
- Metrics指标
- Serverless
- Partition统一
- Polling Consumer
- 支持多个Binder同时使用
- 建立事件机制
重试
Consumer端可以配置重试次数,当消息消费失败的时候会进行重试。
底层使用Spring Retry去重试,重试次数可自定义配置。
# 默认重试次数为3,配置大于1时才会生效 spring.cloud.stream.bindings.<channelName>.consumer.maxAttempte=3
消息发送失败的处理
Producer发送消息出错的情况下,可以配置错误处理,将错误信息发送给对应ID的MessageChannel
- 消息发送失败的场景下,会将消息发送到一个
MessageChannel。这个MessageChannel会取ApplicationContext中name为topic.errors(topic就是配置的destination)的Bean。 - 如果找不到就会自动构建一个
PublishSubscribeChannel。 - 然后使用
BridgeHandler订阅这个MessageChannel,同时再设置ApplicationContext中name为errorChannel的PublishSubscribeChannel消息通道为BridgeHandler的outputChannel。
public static final String ERROR_CHANNEL_BEAN_NAME = "errorChannel" private SubscribableChannel registerErrorInfrastructure( ProducerDestination destination) { // destination.getName() + ".errors" String errorChannelName = errorsBaseName(destination); SubscribableChannel errorChannel; if (getApplicationContext().containsBean(errorChannelName)) { Object errorChannelObject = getApplicationContext().getBean(errorChannelName); if (!(errorChannelObject instanceof SubscribableChannel)) { throw new IllegalStateException("Error channel '" + errorChannelName + "' must be a SubscribableChannel"); } errorChannel = (SubscribableChannel) errorChannelObject; } else { errorChannel = new PublishSubscribeChannel(); ((GenericApplicationContext) getApplicationContext()).registerBean( errorChannelName, SubscribableChannel.class, () -> errorChannel); } MessageChannel defaultErrorChannel = null; if (getApplicationContext() .containsBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)) { defaultErrorChannel = getApplicationContext().getBean( IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME, MessageChannel.class); } if (defaultErrorChannel != null) { BridgeHandler errorBridge = new BridgeHandler(); errorBridge.setOutputChannel(defaultErrorChannel); errorChannel.subscribe(errorBridge); String errorBridgeHandlerName = getErrorBridgeName(destination); ((GenericApplicationContext) getApplicationContext()).registerBean( errorBridgeHandlerName, BridgeHandler.class, () -> errorBridge); } return errorChannel; }
- 示例代码
spring.cloud.stream.bindings.output.destination=test-output # 消息发送失败的处理逻辑默认是关闭的 spring.cloud.stream.bindings.output.producer.errorChannelEnabled=true
@Bean("test-output.errors") MessageChannel testOutputErrorChannel() { return new PublishSubscribeChannel(); } @Service class ErrorProduceService { @ServiceActivator(inputChannel = "test-output.errors") public void receiveProduceError(Message receiveMsg) { System.out.println("receive error msg: " + receiveMsg); } }
消费错误处理
Consumer消费消息出错的情况下,可以配置错误处理,将错误信息发给对应ID的MessageChannel
消息错误处理与生产错误处理大致相同。错误的MessageChannel对应的name为topic.group.errors,还会加上多个MessageHandler订阅的一些判断,使用ErrorMessageStrategy创建错误消息等内容。
- 示例代码
spring.cloud.stream.bindings.input.destination=test-input spring.cloud.stream.bindings.input.group=test-input-group
@StreamListener(Sink.INPUT) public void receive(String receiveMsg) { throw new RuntimeException("Oops"); } @ServiceActivator(inputChannel = "test-input.test-input-group.errors") public void receiveConsumeError(Message receiveMsg) { System.out.println("receive error msg: " + receiveMsg); }
建议直接使用topic.group.errors这个消息通道,并设置发送到单播模式的DirectChannel消息通道中(使用@ServiceActivator注解接收会直接构成DirectChannel),这样会确保只会被唯一的一个订阅了topic.group.errors的MessageHandler处理,否则可能会被多个MessageHandler处理,导致出现一些意想不到的结果。
自定义MessageHandler类型
默认情况下,Output Binding对应的MessageChannel和Input Binding对应的SubscribeChannel会被构造成DirectChannel。
SCS提供了BindingTargetFactory接口进行扩展,比如可以扩展构造PublishSubscribeChannel这种广播类型的MessageChannel。
BindingTargetFactory接口只有两个实现类
SubscribableChannelBindingTargetFactory:针对Input Binding和Output Binding都会构造成DirectWithAttributesChannel类型的MessageChannel(一种带有HashMap属性的DirectChannel)。MessageSourceBindingTargetFactory:不支持Output Binding,Input Binding会构造成DefaultPollableMessageSource。DefaultPollableMessageSource内部维护着MessageSource属性,该属性用于拉取消息。
Endpoint端点
SCS提供了BindingsEndpoint,可以获取Binding信息或对Binding生命周期进行修改,比如start、stop、pause或resume。
BindingsEndpoint的ID是bindings,对外暴露了一下3个操作:
- 修改
Binding状态,可以改成STARTED、STOPPED、PAUSED和RESUMED,对应Binding接口的4个操作。 - 查询单个
Binding的状态信息。 - 查询所有
Binding的状态信息。
@Endpoint(id = "bindings") public class BindingsEndpoint { ... @WriteOperation public void changeState(@Selector String name, State state) { Binding<?> binding = BindingsEndpoint.this.locateBinding(name); if (binding != null) { switch (state) { case STARTED: binding.start(); break; case STOPPED: binding.stop(); break; case PAUSED: binding.pause(); break; case RESUMED: binding.resume(); break; default: break; } } } @ReadOperation public List<?> queryStates() { List<Binding<?>> bindings = new ArrayList<>(gatherInputBindings()); bindings.addAll(gatherOutputBindings()); return this.objectMapper.convertValue(bindings, List.class); } @ReadOperation public Binding<?> queryState(@Selector String name) { Assert.notNull(name, "'name' must not be null"); return this.locateBinding(name); } ... }
Metrics指标
该功能自动与micrometer集成进行Metrics统计,可以通过前缀spring.cloud.stream.metrics进行相关配置,配置项spring.cloud.stream.bindings.applicationMetrics.destination会构造MetersPublisherBinding,将相关的metrics发送到MQ中。
Serverless
默认与Spring Cloud Function集成。
可以使用Function处理消息。配置文件需要加上function配置。
spring.cloud.stream.function.definition=uppercase | addprefix
@Bean public Function<String, String> uppercase() { return x -> x.toUpperCase(); } @Bean public Function<String, String> addprefix() { return x -> "prefix-" + x; }
Partition统一
SCS统一Partition相关的设置,可以屏蔽不同MQ Partition的设置。
Producer Binding提供的ProducerProperties提供了一些Partition相关的配置:
partitionKeyExpression:partition key提取表达式。partitionKeyExtractorName:是一个实现PartitionKeyExtractorStrategy接口的Bean name。PartitionKeyExtractorStrategy是一个根据Message获取partition key的接口。如果两者都配置,优先级高于partitionKeyExtractorName。partitionSelectorName:是一个实现PartitionSelectorStrategy接口的Bean name。PartitionSelectorStrategy是一个根据partition key决定选择哪个partition 的接口。partitionSelectorExpression:partition 选择表达式,会根据表达式和partition key得到最终的partition。如果两者都配置,优先partitionSelectorExpression表达式解析partition。partitionCount:partition 个数。该属性不一定会生效,Kafka Binder 和RocketMQ Binder会使用topic上的partition 个数覆盖该属性。
public final class PartitioningInterceptor implements ChannelInterceptor { ... @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { if (!message.getHeaders().containsKey(BinderHeaders.PARTITION_OVERRIDE)) { int partition = this.partitionHandler.determinePartition(message); return MessageConverterConfigurer.this.messageBuilderFactory .fromMessage(message) .setHeader(BinderHeaders.PARTITION_HEADER, partition).build(); } else { return MessageConverterConfigurer.this.messageBuilderFactory .fromMessage(message) .setHeader(BinderHeaders.PARTITION_HEADER, message.getHeaders() .get(BinderHeaders.PARTITION_OVERRIDE)) .removeHeader(BinderHeaders.PARTITION_OVERRIDE).build(); } } } public class PartitionHandler { ... public int determinePartition(Message<?> message) { Object key = extractKey(message); int partition; if (this.producerProperties.getPartitionSelectorExpression() != null) { partition = this.producerProperties.getPartitionSelectorExpression() .getValue(this.evaluationContext, key, Integer.class); } else { partition = this.partitionSelectorStrategy.selectPartition(key, this.partitionCount); } // protection in case a user selector returns a negative. return Math.abs(partition % this.partitionCount); } private Object extractKey(Message<?> message) { Object key = invokeKeyExtractor(message); if (key == null && this.producerProperties.getPartitionKeyExpression() != null) { key = this.producerProperties.getPartitionKeyExpression() .getValue(this.evaluationContext, message); } Assert.notNull(key, "Partition key cannot be null"); return key; } ... }
Polling Consumer
实现MessageSource进行polling操作的Consumer。
普通的Pub/Sub模式需要定义SubscribeableChannel类型的返回值,Polling Consumer需要定义PollableMessageSource类型的返回值。
public interface PollableSink { /** * Input channel name. */ String INPUT = "input"; /** * @return input channel. */ @Input(Sink.INPUT) PollableMessageSource input(); }
支持多个Binder同时使用
支持多个Binder同时使用,在配置Binding的时候需要指定对应的Binder。
配置全局默认的Binder:spring.cloud.stream.default-binder=rocketmq。
配置各个Binder内部的配置信息:
spring.cloud.stream.binders.rocketmq.environment.<xx>=xx
spring.cloud.stream.binders.rocketmq.type=rocketmq
配置Binding对应的Binder:
spring.cloud.stream.bindings.<channelName>.binder=kafka
spring.cloud.stream.bindings.<channelName>.binder=rocketmq
spring.cloud.stream.bindings.<channelName>.binder=rabbit
建立事件机制
比如,新建BindingCreateEvent事件,用户的应用就可以监听该事件在创建Input Binding或Output Binding 时做业务相关的处理。
以上就是Spring Cloud Stream 高级特性使用详解的详细内容,更多关于Spring Cloud Stream 高级特性的资料请关注自由互联其它相关文章!
本文共计2114个文字,预计阅读时间需要9分钟。
目录+重试+消息发送失败处理+消费错误处理+自定义MessageHandler类型+端点+Metrics指标+Serverless+分区系统+轮询消费者+支持多Binder同时使用+建立事件机制+重试+消费者+端点可配置
目录
- 重试
- 消息发送失败的处理
- 消费错误处理
- 自定义MessageHandler类型
- Endpoint端点
- Metrics指标
- Serverless
- Partition统一
- Polling Consumer
- 支持多个Binder同时使用
- 建立事件机制
重试
Consumer端可以配置重试次数,当消息消费失败的时候会进行重试。
底层使用Spring Retry去重试,重试次数可自定义配置。
# 默认重试次数为3,配置大于1时才会生效 spring.cloud.stream.bindings.<channelName>.consumer.maxAttempte=3
消息发送失败的处理
Producer发送消息出错的情况下,可以配置错误处理,将错误信息发送给对应ID的MessageChannel
- 消息发送失败的场景下,会将消息发送到一个
MessageChannel。这个MessageChannel会取ApplicationContext中name为topic.errors(topic就是配置的destination)的Bean。 - 如果找不到就会自动构建一个
PublishSubscribeChannel。 - 然后使用
BridgeHandler订阅这个MessageChannel,同时再设置ApplicationContext中name为errorChannel的PublishSubscribeChannel消息通道为BridgeHandler的outputChannel。
public static final String ERROR_CHANNEL_BEAN_NAME = "errorChannel" private SubscribableChannel registerErrorInfrastructure( ProducerDestination destination) { // destination.getName() + ".errors" String errorChannelName = errorsBaseName(destination); SubscribableChannel errorChannel; if (getApplicationContext().containsBean(errorChannelName)) { Object errorChannelObject = getApplicationContext().getBean(errorChannelName); if (!(errorChannelObject instanceof SubscribableChannel)) { throw new IllegalStateException("Error channel '" + errorChannelName + "' must be a SubscribableChannel"); } errorChannel = (SubscribableChannel) errorChannelObject; } else { errorChannel = new PublishSubscribeChannel(); ((GenericApplicationContext) getApplicationContext()).registerBean( errorChannelName, SubscribableChannel.class, () -> errorChannel); } MessageChannel defaultErrorChannel = null; if (getApplicationContext() .containsBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)) { defaultErrorChannel = getApplicationContext().getBean( IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME, MessageChannel.class); } if (defaultErrorChannel != null) { BridgeHandler errorBridge = new BridgeHandler(); errorBridge.setOutputChannel(defaultErrorChannel); errorChannel.subscribe(errorBridge); String errorBridgeHandlerName = getErrorBridgeName(destination); ((GenericApplicationContext) getApplicationContext()).registerBean( errorBridgeHandlerName, BridgeHandler.class, () -> errorBridge); } return errorChannel; }
- 示例代码
spring.cloud.stream.bindings.output.destination=test-output # 消息发送失败的处理逻辑默认是关闭的 spring.cloud.stream.bindings.output.producer.errorChannelEnabled=true
@Bean("test-output.errors") MessageChannel testOutputErrorChannel() { return new PublishSubscribeChannel(); } @Service class ErrorProduceService { @ServiceActivator(inputChannel = "test-output.errors") public void receiveProduceError(Message receiveMsg) { System.out.println("receive error msg: " + receiveMsg); } }
消费错误处理
Consumer消费消息出错的情况下,可以配置错误处理,将错误信息发给对应ID的MessageChannel
消息错误处理与生产错误处理大致相同。错误的MessageChannel对应的name为topic.group.errors,还会加上多个MessageHandler订阅的一些判断,使用ErrorMessageStrategy创建错误消息等内容。
- 示例代码
spring.cloud.stream.bindings.input.destination=test-input spring.cloud.stream.bindings.input.group=test-input-group
@StreamListener(Sink.INPUT) public void receive(String receiveMsg) { throw new RuntimeException("Oops"); } @ServiceActivator(inputChannel = "test-input.test-input-group.errors") public void receiveConsumeError(Message receiveMsg) { System.out.println("receive error msg: " + receiveMsg); }
建议直接使用topic.group.errors这个消息通道,并设置发送到单播模式的DirectChannel消息通道中(使用@ServiceActivator注解接收会直接构成DirectChannel),这样会确保只会被唯一的一个订阅了topic.group.errors的MessageHandler处理,否则可能会被多个MessageHandler处理,导致出现一些意想不到的结果。
自定义MessageHandler类型
默认情况下,Output Binding对应的MessageChannel和Input Binding对应的SubscribeChannel会被构造成DirectChannel。
SCS提供了BindingTargetFactory接口进行扩展,比如可以扩展构造PublishSubscribeChannel这种广播类型的MessageChannel。
BindingTargetFactory接口只有两个实现类
SubscribableChannelBindingTargetFactory:针对Input Binding和Output Binding都会构造成DirectWithAttributesChannel类型的MessageChannel(一种带有HashMap属性的DirectChannel)。MessageSourceBindingTargetFactory:不支持Output Binding,Input Binding会构造成DefaultPollableMessageSource。DefaultPollableMessageSource内部维护着MessageSource属性,该属性用于拉取消息。
Endpoint端点
SCS提供了BindingsEndpoint,可以获取Binding信息或对Binding生命周期进行修改,比如start、stop、pause或resume。
BindingsEndpoint的ID是bindings,对外暴露了一下3个操作:
- 修改
Binding状态,可以改成STARTED、STOPPED、PAUSED和RESUMED,对应Binding接口的4个操作。 - 查询单个
Binding的状态信息。 - 查询所有
Binding的状态信息。
@Endpoint(id = "bindings") public class BindingsEndpoint { ... @WriteOperation public void changeState(@Selector String name, State state) { Binding<?> binding = BindingsEndpoint.this.locateBinding(name); if (binding != null) { switch (state) { case STARTED: binding.start(); break; case STOPPED: binding.stop(); break; case PAUSED: binding.pause(); break; case RESUMED: binding.resume(); break; default: break; } } } @ReadOperation public List<?> queryStates() { List<Binding<?>> bindings = new ArrayList<>(gatherInputBindings()); bindings.addAll(gatherOutputBindings()); return this.objectMapper.convertValue(bindings, List.class); } @ReadOperation public Binding<?> queryState(@Selector String name) { Assert.notNull(name, "'name' must not be null"); return this.locateBinding(name); } ... }
Metrics指标
该功能自动与micrometer集成进行Metrics统计,可以通过前缀spring.cloud.stream.metrics进行相关配置,配置项spring.cloud.stream.bindings.applicationMetrics.destination会构造MetersPublisherBinding,将相关的metrics发送到MQ中。
Serverless
默认与Spring Cloud Function集成。
可以使用Function处理消息。配置文件需要加上function配置。
spring.cloud.stream.function.definition=uppercase | addprefix
@Bean public Function<String, String> uppercase() { return x -> x.toUpperCase(); } @Bean public Function<String, String> addprefix() { return x -> "prefix-" + x; }
Partition统一
SCS统一Partition相关的设置,可以屏蔽不同MQ Partition的设置。
Producer Binding提供的ProducerProperties提供了一些Partition相关的配置:
partitionKeyExpression:partition key提取表达式。partitionKeyExtractorName:是一个实现PartitionKeyExtractorStrategy接口的Bean name。PartitionKeyExtractorStrategy是一个根据Message获取partition key的接口。如果两者都配置,优先级高于partitionKeyExtractorName。partitionSelectorName:是一个实现PartitionSelectorStrategy接口的Bean name。PartitionSelectorStrategy是一个根据partition key决定选择哪个partition 的接口。partitionSelectorExpression:partition 选择表达式,会根据表达式和partition key得到最终的partition。如果两者都配置,优先partitionSelectorExpression表达式解析partition。partitionCount:partition 个数。该属性不一定会生效,Kafka Binder 和RocketMQ Binder会使用topic上的partition 个数覆盖该属性。
public final class PartitioningInterceptor implements ChannelInterceptor { ... @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { if (!message.getHeaders().containsKey(BinderHeaders.PARTITION_OVERRIDE)) { int partition = this.partitionHandler.determinePartition(message); return MessageConverterConfigurer.this.messageBuilderFactory .fromMessage(message) .setHeader(BinderHeaders.PARTITION_HEADER, partition).build(); } else { return MessageConverterConfigurer.this.messageBuilderFactory .fromMessage(message) .setHeader(BinderHeaders.PARTITION_HEADER, message.getHeaders() .get(BinderHeaders.PARTITION_OVERRIDE)) .removeHeader(BinderHeaders.PARTITION_OVERRIDE).build(); } } } public class PartitionHandler { ... public int determinePartition(Message<?> message) { Object key = extractKey(message); int partition; if (this.producerProperties.getPartitionSelectorExpression() != null) { partition = this.producerProperties.getPartitionSelectorExpression() .getValue(this.evaluationContext, key, Integer.class); } else { partition = this.partitionSelectorStrategy.selectPartition(key, this.partitionCount); } // protection in case a user selector returns a negative. return Math.abs(partition % this.partitionCount); } private Object extractKey(Message<?> message) { Object key = invokeKeyExtractor(message); if (key == null && this.producerProperties.getPartitionKeyExpression() != null) { key = this.producerProperties.getPartitionKeyExpression() .getValue(this.evaluationContext, message); } Assert.notNull(key, "Partition key cannot be null"); return key; } ... }
Polling Consumer
实现MessageSource进行polling操作的Consumer。
普通的Pub/Sub模式需要定义SubscribeableChannel类型的返回值,Polling Consumer需要定义PollableMessageSource类型的返回值。
public interface PollableSink { /** * Input channel name. */ String INPUT = "input"; /** * @return input channel. */ @Input(Sink.INPUT) PollableMessageSource input(); }
支持多个Binder同时使用
支持多个Binder同时使用,在配置Binding的时候需要指定对应的Binder。
配置全局默认的Binder:spring.cloud.stream.default-binder=rocketmq。
配置各个Binder内部的配置信息:
spring.cloud.stream.binders.rocketmq.environment.<xx>=xx
spring.cloud.stream.binders.rocketmq.type=rocketmq
配置Binding对应的Binder:
spring.cloud.stream.bindings.<channelName>.binder=kafka
spring.cloud.stream.bindings.<channelName>.binder=rocketmq
spring.cloud.stream.bindings.<channelName>.binder=rabbit
建立事件机制
比如,新建BindingCreateEvent事件,用户的应用就可以监听该事件在创建Input Binding或Output Binding 时做业务相关的处理。
以上就是Spring Cloud Stream 高级特性使用详解的详细内容,更多关于Spring Cloud Stream 高级特性的资料请关注自由互联其它相关文章!

