亚洲精品久久久中文字幕-亚洲精品久久片久久-亚洲精品久久青草-亚洲精品久久婷婷爱久久婷婷-亚洲精品久久午夜香蕉

您的位置:首頁技術文章
文章詳情頁

Spring Cloud Stream簡單用法

瀏覽:100日期:2023-06-29 18:49:32
目錄簡單使用Spring Cloud Stream 構建基于RocketMQ的生產者和消費者生產者消費者Stream其他特性消息發送失敗的處理消費者錯誤處理

Spring Cloud Stream對Spring Cloud體系中的Mq進⾏了很好的上層抽象,可以讓我們與具體消息中間件解耦合,屏蔽掉了底層具體MQ消息中間件的細節差異,就像Hibernate屏蔽掉了具體數據庫(Mysql/Oracle⼀樣)。如此⼀來,我們學習、開發、維護MQ都會變得輕松。⽬前Spring Cloud Stream原生⽀持RabbitMQ和Kafka,阿里在這個基礎上提供了RocketMQ的支持

簡單使用Spring Cloud Stream 構建基于RocketMQ的生產者和消費者生產者

pom文件中加入依賴

<dependencies><dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> <version>2.1.0.RELEASE</version></dependency> </dependencies>

配置文件中增加關于Spring Cloud Stream binder和bindings的配置

spring: application: name: zhao-cloud-stream-producer cloud: stream: rocketmq:binder: name-server: 127.0.0.1:9876bindings: output: producer: group: test sync: true bindings:output: destination: stream-test-topic content-type: text/plain # 內容格式。這里使用 JSON

其中destination代表生產的數據發送到的topic 然后定義一個channel用于數據發送

import org.springframework.cloud.stream.annotation.Output;import org.springframework.messaging.MessageChannel;public interface TestChannel { @Output('output') MessageChannel output();}

最后構造數據發送的接口

@Controllerpublic class SendMessageController { @Resource private TestChannel testChannel; @ResponseBody @RequestMapping(value = 'send', method = RequestMethod.GET) public String sendMessage() {String messageId = UUID.randomUUID().toString();Message<String> message = MessageBuilder.withPayload('this is a test:' + messageId).setHeader(MessageConst.PROPERTY_TAGS, 'test').build();try { testChannel.output().send(message); return messageId + '發送成功';} catch (Exception e) { return messageId + '發送失敗,原因:' + e.getMessage();} }}消費者

消費者的pom引入與生產者相同,在此不再贅述,配置時需要將stream的output修改為input并修改對應屬性

spring: application: name: zhao-cloud-stream-consumer cloud: stream: rocketmq:binder: name-server: 127.0.0.1:9876bindings: input: consumer: tags: test bindings:input: destination: stream-test-topic content-type: text/plain # 內容格式。這里使用 JSON group: test

另外關于channel的構造也要做同樣的修改

import org.springframework.cloud.stream.annotation.Input;import org.springframework.messaging.SubscribableChannel;public interface TestChannel { @Input('input') SubscribableChannel input();}

最后我在啟動類中對收到的消息進行了監聽

@StreamListener('input') public void receiveInput(@Payload Message message) throws ValidationException {System.out.println('input1 receive: ' + message.getPayload() + ', foo header: ' + message.getHeaders().get('foo')); }

測試結果

Spring Cloud Stream簡單用法

Spring Cloud Stream簡單用法

Stream其他特性消息發送失敗的處理

消息發送失敗后悔發送到默認的一個“topic.errors'的channel中(topic是配置的destination)。要配置消息發送失敗的處理,需要將錯誤消息的channel打開 消費者配置如下

spring: application: name: zhao-cloud-stream-producer cloud: stream: rocketmq:binder: name-server: 127.0.0.1:9876bindings: output: producer: group: test sync: true bindings:output: destination: stream-test-topic content-type: text/plain # 內容格式。這里使用 JSON producer: errorChannelEnabled: true

在啟動類中配置錯誤消息的Channel信息

@Bean('stream-test-topic.errors') MessageChannel testoutPutErrorChannel(){return new PublishSubscribeChannel(); }

新建異常處理service

import org.springframework.integration.annotation.ServiceActivator;import org.springframework.messaging.Message;import org.springframework.stereotype.Service;@Servicepublic class ErrorProducerService { @ServiceActivator(inputChannel = 'stream-test-topic.errors') public void receiveProducerError(Message message){System.out.println('receive error msg :'+message); }}

當發生異常時,由于測試類中已經將異常捕獲,處理發送異常主要是在這里進行。模擬,應用與rocketMq斷開的場景。可見

Spring Cloud Stream簡單用法 Spring Cloud Stream簡單用法

消費者錯誤處理

首先增加配置為

spring: application: name: zhao-cloud-stream-producer cloud: stream: rocketmq:binder: name-server: 127.0.0.1:9876bindings: output: producer: group: test sync: true bindings:output: destination: stream-test-topic content-type: text/plain # 內容格式。這里使用 JSON producer: errorChannelEnabled: true

增加相應的模擬異常的操作

@StreamListener('input') public void receiveInput(@Payload Message message) throws ValidationException {//System.out.println('input1 receive: ' + message.getPayload() + ', foo header: ' + message.getHeaders().get('foo'));throw new RuntimeException('oops'); } @ServiceActivator(inputChannel = 'stream-test-topic.test.errors') public void receiveConsumeError(Message message){System.out.println('receive error msg'+message.getPayload()); }

Spring Cloud Stream簡單用法

代碼地址https://github.com/zhendiao/deme-code/tree/main/zp

到此這篇關于Spring Cloud Stream簡單用法的文章就介紹到這了,更多相關Spring Cloud Stream使用內容請搜索好吧啦網以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持好吧啦網!

標簽: Spring
相關文章:
主站蜘蛛池模板: 亚洲国产成人99精品激情在线 | 久久精品免费全国观看国产 | 国产精品麻豆传媒 | 国产高清免费在线 | 国产大学生真实在线播放 | 国产精品亚洲片在线观看麻豆 | a一级片| 视频一区二区在线 | 青草视频免费观看 | 农村寡妇一级毛片免费播放 | 久久国产精品一国产精品 | 一级片黑人 | 国产伦理久久精品久久久久 | 奇米影视久久777中文字幕 | 中文字幕美日韩在线高清 | 成年人黄视频 | 日韩精品中文字幕视频一区 | 成人日韩在线观看 | 亚洲综合区小说区激情区噜噜 | 国产99在线 | 亚洲 | 97视频在线免费观看 | 色婷婷激婷婷深爱五月老司机 | 日韩一区二区三区在线播放 | 影音先锋5566中文源资源 | 一级片软件 | 在线成人| 性短视频在线观看免费不卡流畅 | 日韩免费毛片全部不收费 | 久久久国产亚洲精品 | 日韩精品在线一区二区 | 一级片久久 | 瑟瑟网站在线观看 | 51精品视频在线播放观看 | 自由xxx色视频18 | 啪啪免费网站视频观看 | 欧美毛片一级的免费的 | 操批视频 | 91丨国产 | 俄罗斯小younv另类 | 国产黄的网站免费 | 国产九九热视频 |