spring 整合kafka監(jiān)聽消費的配置過程
最近項目里有個需求,要消費kafka里的數(shù)據(jù)。之前也手動寫過代碼去消費kafka數(shù)據(jù)。但是轉(zhuǎn)念一想。既然spring提供了消費kafka的方法。就沒必要再去重復(fù)造輪子。于是嘗試使用spring的API。
項目技術(shù)背景,使用springMVC,XML配置和注解相互使用。kafka的配置都是使用XML方式。
整合過程1. 引入spring-kafka的依賴包
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.0.RELEASE</version> </dependency>
2. 在spring的xml文件里增加配置項,也可以單獨創(chuàng)建一個spring-context-XX.xml文件。
<!-- consumer configuration 該配置項可以根據(jù)自己業(yè)務(wù)的實際需求做增加或刪除--> <bean class='java.util.HashMap'> <constructor-arg> <map><entry key='bootstrap.servers' value='${kafka.bootstrap.servers}' /><entry key='group.id' value='group' /><entry key='enable.auto.commit' value='true' /><entry key='auto.commit.interval.ms' value='3000' /><entry key='session.timeout.ms' value='10000' /><entry key='key.deserializer' value='org.apache.kafka.common.serialization.StringDeserializer' /><entry key='value.deserializer' value='org.apache.kafka.common.serialization.StringDeserializer' /> </map> </constructor-arg> </bean> <!-- create factory 該類是spring jar包里提供,就這么配置--> <bean class='org.springframework.kafka.core.DefaultKafkaConsumerFactory'> <constructor-arg> <ref bean='consumerProperties' /> </constructor-arg> </bean> <!-- 自定義的消費類,需要實現(xiàn)spring的接口 --> <bean /> <!-- 該類也是jar包里提供的,注入的監(jiān)聽類是自己定義的,topic名稱是配置文件引入的--> <bean class='org.springframework.kafka.listener.ContainerProperties'> <constructor-arg name='topics' value='${kafka.paypal.topic.name}'/> <property name='messageListener' ref='payPalConsumer' /> </bean> <!-- 改類也是jar里提供的,把這個containerProperties和consumerfactory 注入 --> <bean init-method='doStart'> <constructor-arg ref='consumerFactory' /> <constructor-arg ref='containerProperties' /> </bean>
2. 自定義消費者類,消費者類依然可以使用注解。
/** * get msg from kafka */@Component public class PayPalConsumer implements MessageListener<String, String> { private static Logger logger = LoggerFactory.getLogger(PayPalConsumer.class); @Autowired private XXService XXService; @Override public void onMessage(ConsumerRecord<String, String> authorizeRecord) { String value = authorizeRecord.value(); if (StringUtils.isEmpty(value)){ logger.warn('receive message from kafka is null'); return; } logger.info('receive message from kafka is {}',value); }}
使用這個步驟配置,一次性過。非常順利。
到此這篇關(guān)于spring 整合kafka監(jiān)聽消費的配置過程的文章就介紹到這了,更多相關(guān)spring 整合kafka內(nèi)容請搜索好吧啦網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持好吧啦網(wǎng)!
相關(guān)文章:
1. python b站視頻下載的五種版本2. 如何通過vscode運行調(diào)試javascript代碼3. 半小時實現(xiàn)Java手?jǐn)]網(wǎng)絡(luò)爬蟲框架(附完整源碼)4. 測試模式 - XSL教程 - 55. 教你JS更簡單的獲取表單中數(shù)據(jù)(formdata)6. JAVA抽象類及接口使用方法解析7. python如何寫個俄羅斯方塊8. 《CSS3實戰(zhàn)》筆記--漸變設(shè)計(一)9. Python結(jié)合百度語音識別實現(xiàn)實時翻譯軟件的實現(xiàn)10. JavaScript設(shè)計模式之策略模式實現(xiàn)原理詳解
![半小時實現(xiàn)Java手?jǐn)]網(wǎng)絡(luò)爬蟲框架(附完整源碼)](http://www.aoyou183.cn/attached/image/15.jpg)