2021年12月20日 星期一

Android Coroutine: Flow MutableSharedFlow Constructor 的用法

doc from SharedFlowImpl 

這週想要加強一下 Coroutine 方面的知識,像是 flow 跟 channel ,而 flow 看著看著就出現了 MutableSharedFlow 與 MutableStateFlow ,前者的建構子有三個參數,第一次看不懂,第二次看還是不懂XD,可憐的智商啊,花了一點時間克服障礙後得到的結果如下:


replay:

the number of values replayed to new subscribers (cannot be negative, defaults to zero).

新的訂閱者都可以收到 replay 裡的所有的值,都收完才會拿到最新的值


extraBufferCapacity:

the number of values buffered in addition to replay. emit does not suspend while there is a buffer space remaining (optional, cannot be negative, defaults to zero).

提供多的 buffer,來給慢速訂閱者使用,實際上就是有某個訂閱者,可能很久才來拿一次資料,這時候他拿到的資料會是 buffer 裡最舊的。


onBufferOverflow:

configures an emit action on buffer overflow. Optional, defaults to suspending attempts to emit a value. Values other than BufferOverflow.SUSPEND are supported only when replay > 0 or extraBufferCapacity > 0. Buffer overflow can happen only when there is at least one subscriber that is not ready to accept the new value. In the absence of subscribers only the most recent replay values are stored and the buffer overflow behavior is never triggered and has no effect.

就是當 buffer overflow 的行為,分為 SUSPEND, DROP_OLDEST, DROP_LATEST


SUSPEND

當資料要爆的時候就會停止發送,等到資料被拿走才會繼續發送

DROP_OLDEST/DROP_LATEST

替換掉最新/最舊的資料


驗證

假設我的

replay= 2, 

extraBufferCapacity= 2 , 

onBufferOverflow = DROP_OLDEST,

則我發射了一連串資料,從1到5發送順序會是這樣:

buffer-1 buffer-2 replay3 replay4 now5  

繼續下去會是,會替換最舊的資料

buffer-2 buffer-3 replay4 replay5 now6  


實例

有兩個訂閱者(A, B)都是從頭開始訂閱

A 為正常的訂閱者

 repeatOnLifecycle(Lifecycle.State.STARTED){

                    flowVM.mutableSharedFlow.collect{

                        log("FIRST $it")

                    }

                }

則他會拿到123456

B 為一個比較忙的訂閱者,他每次拿完資料後會去做點別的事再回來

   repeatOnLifecycle(Lifecycle.State.STARTED){

                    flowVM.mutableSharedFlow.collect{

                        log("SECOND SUSPEND $it")

                        delay(3000)

                        log("SECOND $it")

                    }

                }

B 一開始拿到1後過了三秒回來假設目前已經發射到4了,則 B 會拿到2

buffer-1(已被拿) replay2 replay3 now4  

若 B 一開始拿到1後過了十秒回來假設目前已經發射到11了,則 B 會拿到7

 buffer-7 buffer-8 replay9 replay10 now11

如果有新的訂閱者從 now6  開始訂閱,則會收到4,5,6,replay 作用於途中加入的訂閱者,extraBufferCapacity 作用於慢速訂閱者。


狀況為 SUSPEND

replay= 2, 

extraBufferCapacity= 2 , 

onBufferOverflow = SUSPEND,

一樣有快慢訂閱者(A, B),當B拿了1 後過了十秒才回來則,發射者會因為 buffer 已滿, replay + extraBufferCapacity = 4,當發射到5就會 suspend,直到 B 拿到確實把1消耗掉,1一被消耗掉就會繼續發射6,然後繼續 suspend 等待 B 回來拿2

buffer-1 buffer-2 replay3 replay4 now5  


大概四這樣,但是我得先想想使用情境是啥...


參考:

https://ithelp.ithome.com.tw/articles/10278861

https://www.raywenderlich.com/22030171-reactive-streams-on-kotlin-sharedflow-and-stateflow#toc-anchor-005




沒有留言:

張貼留言