亚洲精品久久久中文字幕-亚洲精品久久片久久-亚洲精品久久青草-亚洲精品久久婷婷爱久久婷婷-亚洲精品久久午夜香蕉

您的位置:首頁技術文章
文章詳情頁

詳解Java中的reactive stream協議

瀏覽:2日期:2022-08-11 13:06:14
目錄背景什么是reactive stream深入了解java版本的reactive streamPublisherSubscriberSubscriptionProcessorJDK中reactive stream的實現總結背景

每個數據流都有一個生產者一個消費者。生產者負責產生數據,而消費者負責消費數據。如果是同步系統,生產一個消費一個沒什么問題。但是如果在異步系統中,就會產生問題。

因為生產者無法感知消費者的狀態,不知道消費者到底是繁忙狀態還是空閑狀態,是否有能力去消費更多的數據。

一般來說數據隊列的長度都是有限的,即使沒有做限制,但是系統的內存也是有限的。當太多的數據沒有被消費的話,會導致內存溢出或者數據得不到即使處理的問題。

這時候就需要back-pressure了。

如果消息接收方消息處理不過來,則可以通知消息發送方,告知其正在承受壓力,需要降低負載。back-pressure是一種消息反饋機制,從而使系統得以優雅地響應負載, 而不是在負載下崩潰。

而reactive stream的目的就是用來管理異步服務的流數據交換,并能夠讓接收方自主決定接受數據的頻率。back-pressure就是reactive stream中不可或缺的一部分。

什么是reactive stream

上面我們講到了reactive stream的作用,大家應該對reactive stream有了一個基本的了解。這里我們再給reactive stream做一個定義:

reactive stream就是一個異步stream處理的標準,它的特點就是非阻塞的back pressure。

reactive stream只是一個標準,它定義了實現非阻塞的back pressure的最小區間的接口,方法和協議。

所以reactive stream其實有很多種實現的,不僅僅是java可以使用reactive stream,其他的編程語言也可以。

reactive stream只是定義了最基本的功能,各大實現在實現了基本功能的同時可以自由擴展。

目前reactive stream最新的java版本是1.0.3,是在2019年8月23發布的。它包含了java API,協議定義文件,測試工具集合和具體的實現例子。

深入了解java版本的reactive stream

在介紹java版本的reactive stream之前,我們先回顧一下reactive stream需要做哪些事情:

1.能夠處理無效數量的消息

2.消息處理是有順序的

3.可以異步的在組件之間傳遞消息

4.一定是非阻塞和backpressure的

為了實現這4個功能,reactive stream定義了4個接口,Publisher,Subscriber,Subscription,Processor。這四個接口實際上是一個觀察者模式的實現。接下來我們詳細來分析一下各個接口的作用和約定。

Publisher

先看下Publisher的定義:

public interface Publisher<T> { public void subscribe(Subscriber<? super T> s);}

Publisher就是用來生成消息的。它定義了一個subscribe方法,傳入一個Subscriber。這個方法用來將Publisher和Subscriber進行連接。

一個Publisher可以連接多個Subscriber。

每次調用subscribe建立連接,都會創建一個新的Subscription,Subscription和subscriber是一一對應的。

一個Subscriber只能夠subscribe一次Publisher。

如果subscribe失敗或者被拒絕,則會出發Subscriber.onError(Throwable)方法。

Subscriber

先看下Subscriber的定義:

public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete();}

Subscriber就是消息的接收者。

在Publisher和Subscriber建立連接的時候會觸發onSubscribe(Subscription s)方法。

當調用Subscription.request(long)方法時,onNext(T t)會被觸發,根據request請求參數的大小,onNext會被觸發一次或者多次。

在發生異?;蛘呓Y束時會觸發onError(Throwable t)或者onComplete()方法。

Subscription

先看下Subscription的定義:

public interface Subscription { public void request(long n); public void cancel();}

Subscription代表著一對一的Subscriber和Publisher之間的Subscribe關系。

request(long n)意思是向publisher請求多少個events,這會觸發Subscriber.onNext方法。

cancel()則是請求Publisher停止發送信息,并清除資源。

Processor

先看下Processor的定義:

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}

Processor即是Subscriber又是Publisher,它代表著一種處理狀態。

JDK中reactive stream的實現

在JDK中java.util.concurrent.Flow就是reactive stream語義的一種實現。

Flow從JDK9就開始有了。我們看下它的結構:

詳解Java中的reactive stream協議

從上圖我們可以看到在JDK中Flow是一個final class,而Subscriber,Publisher,Subscription,Processor都是它的內部類。

總結

reactive stream的出現有效的解決了異步系統中的背壓問題。只不過reactive stream只是一個接口標準或者說是一種協議,具體的實現還需要自己去實現。

以上就是詳解Java中的reactive stream協議的詳細內容,更多關于Java中的reactive stream協議的資料請關注好吧啦網其它相關文章!

標簽: Java
相關文章:
主站蜘蛛池模板: 精品国偷自产在线不卡短视频 | 国产亚洲精品aa在线看 | 99在线精品国产不卡在线观看 | 久久久中文字幕日本 | 97se狠狠狠狠狼亚洲综合网 | 亚洲精品久久青草 | 中文字幕在线久热精品 | 欧美亚洲制服 | 免费国产最新进精品视频 | 亚洲国产精品自在在线观看 | 婬荡少妇21p | 2022国内精品免费福利视频 | 国内精品视频在线播放一区 | 久久成人18免费 | 亚洲第一成人天堂第一 | 日韩欧美国产偷亚洲清高 | 在线免费观看黄色小视频 | 亚洲成年网站在线观看 | 国产成人精品三区 | 久久久精品影院 | 国产伦理一区二区三区 | 美国美女黄色片 | 草草视频免费看 | 成人午夜网址 | 日韩一级欧美一级一级国产 | 国产农村乱色xxxx | 日韩高清在线亚洲专区vr | 欧美高清在线精品一区二区不卡 | 国产高清在线观看麻豆 | 羞羞答答www网址进入在线观看 | 毛片黄片一级片 | 国产一级免费 | 午夜香蕉网 | 国产精品搭讪系列在线观看 | 草莓视频caomei888 | 成 人 黄 色 视频播放1 | 国产99久9在线 | 72种姿势欧美久久久久大黄蕉 | a级国产乱理论片在线观看看 | 成人免费淫片在线费观看 | 色播影院性播12306影视 |