Spring Cloud Stream簡單用法
Spring Cloud Stream對Spring Cloud體系中的Mq進(jìn)⾏了很好的上層抽象,可以讓我們與具體消息中間件解耦合,屏蔽掉了底層具體MQ消息中間件的細(xì)節(jié)差異,就像Hibernate屏蔽掉了具體數(shù)據(jù)庫(Mysql/Oracle⼀樣)。如此⼀來,我們學(xué)習(xí)、開發(fā)、維護(hù)MQ都會變得輕松。⽬前Spring Cloud Stream原生⽀持RabbitMQ和Kafka,阿里在這個(gè)基礎(chǔ)上提供了RocketMQ的支持
簡單使用Spring Cloud Stream 構(gòu)建基于RocketMQ的生產(chǎn)者和消費(fèi)者生產(chǎn)者pom文件中加入依賴
<dependencies><dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> <version>2.1.0.RELEASE</version></dependency> </dependencies>
配置文件中增加關(guān)于Spring Cloud Stream binder和bindings的配置
spring: application: name: zhao-cloud-stream-producer cloud: stream: rocketmq:binder: name-server: 127.0.0.1:9876bindings: output: producer: group: test sync: true bindings:output: destination: stream-test-topic content-type: text/plain # 內(nèi)容格式。這里使用 JSON
其中destination代表生產(chǎn)的數(shù)據(jù)發(fā)送到的topic 然后定義一個(gè)channel用于數(shù)據(jù)發(fā)送
import org.springframework.cloud.stream.annotation.Output;import org.springframework.messaging.MessageChannel;public interface TestChannel { @Output('output') MessageChannel output();}
最后構(gòu)造數(shù)據(jù)發(fā)送的接口
@Controllerpublic class SendMessageController { @Resource private TestChannel testChannel; @ResponseBody @RequestMapping(value = 'send', method = RequestMethod.GET) public String sendMessage() {String messageId = UUID.randomUUID().toString();Message<String> message = MessageBuilder.withPayload('this is a test:' + messageId).setHeader(MessageConst.PROPERTY_TAGS, 'test').build();try { testChannel.output().send(message); return messageId + '發(fā)送成功';} catch (Exception e) { return messageId + '發(fā)送失敗,原因:' + e.getMessage();} }}消費(fèi)者
消費(fèi)者的pom引入與生產(chǎn)者相同,在此不再贅述,配置時(shí)需要將stream的output修改為input并修改對應(yīng)屬性
spring: application: name: zhao-cloud-stream-consumer cloud: stream: rocketmq:binder: name-server: 127.0.0.1:9876bindings: input: consumer: tags: test bindings:input: destination: stream-test-topic content-type: text/plain # 內(nèi)容格式。這里使用 JSON group: test
另外關(guān)于channel的構(gòu)造也要做同樣的修改
import org.springframework.cloud.stream.annotation.Input;import org.springframework.messaging.SubscribableChannel;public interface TestChannel { @Input('input') SubscribableChannel input();}
最后我在啟動類中對收到的消息進(jìn)行了監(jiān)聽
@StreamListener('input') public void receiveInput(@Payload Message message) throws ValidationException {System.out.println('input1 receive: ' + message.getPayload() + ', foo header: ' + message.getHeaders().get('foo')); }
測試結(jié)果
消息發(fā)送失敗后悔發(fā)送到默認(rèn)的一個(gè)“topic.errors'的channel中(topic是配置的destination)。要配置消息發(fā)送失敗的處理,需要將錯(cuò)誤消息的channel打開 消費(fèi)者配置如下
spring: application: name: zhao-cloud-stream-producer cloud: stream: rocketmq:binder: name-server: 127.0.0.1:9876bindings: output: producer: group: test sync: true bindings:output: destination: stream-test-topic content-type: text/plain # 內(nèi)容格式。這里使用 JSON producer: errorChannelEnabled: true
在啟動類中配置錯(cuò)誤消息的Channel信息
@Bean('stream-test-topic.errors') MessageChannel testoutPutErrorChannel(){return new PublishSubscribeChannel(); }
新建異常處理service
import org.springframework.integration.annotation.ServiceActivator;import org.springframework.messaging.Message;import org.springframework.stereotype.Service;@Servicepublic class ErrorProducerService { @ServiceActivator(inputChannel = 'stream-test-topic.errors') public void receiveProducerError(Message message){System.out.println('receive error msg :'+message); }}
當(dāng)發(fā)生異常時(shí),由于測試類中已經(jīng)將異常捕獲,處理發(fā)送異常主要是在這里進(jìn)行。模擬,應(yīng)用與rocketMq斷開的場景。可見
首先增加配置為
spring: application: name: zhao-cloud-stream-producer cloud: stream: rocketmq:binder: name-server: 127.0.0.1:9876bindings: output: producer: group: test sync: true bindings:output: destination: stream-test-topic content-type: text/plain # 內(nèi)容格式。這里使用 JSON producer: errorChannelEnabled: true
增加相應(yīng)的模擬異常的操作
@StreamListener('input') public void receiveInput(@Payload Message message) throws ValidationException {//System.out.println('input1 receive: ' + message.getPayload() + ', foo header: ' + message.getHeaders().get('foo'));throw new RuntimeException('oops'); } @ServiceActivator(inputChannel = 'stream-test-topic.test.errors') public void receiveConsumeError(Message message){System.out.println('receive error msg'+message.getPayload()); }
代碼地址https://github.com/zhendiao/deme-code/tree/main/zp
到此這篇關(guān)于Spring Cloud Stream簡單用法的文章就介紹到這了,更多相關(guān)Spring Cloud Stream使用內(nèi)容請搜索好吧啦網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持好吧啦網(wǎng)!
相關(guān)文章:
1. Python如何批量生成和調(diào)用變量2. ASP.NET MVC實(shí)現(xiàn)橫向展示購物車3. 通過CSS數(shù)學(xué)函數(shù)實(shí)現(xiàn)動畫特效4. ASP.Net Core對USB攝像頭進(jìn)行截圖5. Python獲取B站粉絲數(shù)的示例代碼6. .net如何優(yōu)雅的使用EFCore實(shí)例詳解7. windows服務(wù)器使用IIS時(shí)thinkphp搜索中文無效問題8. ajax動態(tài)加載json數(shù)據(jù)并詳細(xì)解析9. python利用opencv實(shí)現(xiàn)顏色檢測10. Python 中如何使用 virtualenv 管理虛擬環(huán)境
