-
Back pressure 는 한국어로 "배압" 이라고 하며,
Flow 의 세계에서는 emit 되는 속도보다 collect 되는 속도가 느린 현상을 이야기한다. (돼왕: input 이 쌓여 buffer 가 필요한 상황)
이에 대한 해결방법은 여러가지가 있겠다.
-
아래와 같은 기본 로직이 있다고 하자.
이 로직은 emit 은 1초 단위로 되는데, collect 는 3초 이상을 소비한다.
즉 back pressure 가 발생하는 것이다.
var start = 0L
fun currTime() = System.currentTimeMillis()
fun emitter(): Flow = (1..5)
.asFlow()
.onStart{ start = currTime() }
.onEach{
delay(1000)
print("Emit $it (${currTime() - start}ms)")
}
fun main() = runBlocking{
val time = measureTimeMillis{
emitter()
.collect{
print("\nCollect $it starts (${currTime() - start}ms) ")
delay(3000)
print("\nCollect $it ends (${currTime() - start}ms) ")
}
}
print("\nCollected in $time ms")
}
해당 로직을 그대로 수행하면,
emit -> collect -> emit -> collect -> ... 가 되기 때문에
총 delay 1000 + 3000 = 4000ms
1..5 이기 때문에 총 5회
4초 * 5회 = 약 20초가 걸린다. ( print 시간 등이 있으니 20초가 조금 넘을 것이다. )
-
back pressure 를 해결하는 한 가지 방법은 emit 을 thread 에서 처리하는 것이다.
emitter()
.flowOn(Dispatchers.Default)
.collect{
...
}
이렇게 하면 emit 하는 thread 가 Default dispatcher 를 사용하게 되고, ( flowOn 이 호출되기 전의 로직들이 해당 dispatcher 에서 동작한다. )
collect 하는 thread 는 main 을 사용하게 되기 때문에 emit 에서 소요되는 시간이 줄어든다.
그러나 여전히 collect 는 순차적으로 수행하기 때문에 시간이 오래 걸린다.
게다가 emit 하는 thread 와 collect 하는 thread 의 차이가 발생한다는 단점도 있다.
-
collect 안에서 delay (suspend 함수)를 사용하기 때문에 buffer 를 사용하는 방법도 있겠다.
emitter()
.buffer()
.collect{
...
}
이렇게 하면 collect 안에서 delay 하는 순간 emitter 에서는 계속 emit 을 하여 buffer 에 값들을 쌓아놓을 수 있다.
이는 emit 하는 thread 와 collect 하는 thread 간의 thread 가 동일하다는 장점이 있다.
하지만 만약 collect 자체가 suspending function 을 사용하지 않고 단순 오래 걸리는 경우라면 해결책이 될 수 없다.
-
발행하는 모든 값을 처리할 필요가 없다면 conflation 을 사용하는 방법도 있다.
emitter()
.conflation()
.collect{
...
}
이렇게 하면 collect 안에서 suspending function 이 호출되었을 때 emitter 는 계속 값을 발행하지만, collect 가 수용할 수 없는 상태라면 최신값을 제외하고 모든 값을 버린다.
다시 말하면, collect 는 처리할 수 있는 최신 값만을 처리한다고 보면 된다.
끝
참고 : https://medium.com/@davidecerbo/backpressure-in-kotlin-flows-9324d86c964e
'프로그래밍 놀이터 > Kotlin, Coroutine' 카테고리의 다른 글
[Effective Kotlin] Item2 : Minimize the scope of variables (0) | 2022.02.16 |
---|---|
[Effective Kotlin] Item1 : Limit mutability (0) | 2022.02.15 |
[coroutine] withContext vs async (0) | 2022.02.01 |
[Coroutine] Exception handling in coroutine (0) | 2022.01.30 |
[Coroutine] Coroutine scope functions (0) | 2022.01.29 |
댓글