본문 바로가기
프로그래밍 놀이터/Kotlin, Coroutine

[coroutine] Back-pressure 대응하기

by 돼지왕 왕돼지 2022. 2. 2.
반응형

 

-
Back pressure 는 한국어로 "배압" 이라고 하며,
Flow 의 세계에서는 emit 되는 속도보다 collect 되는 속도가 느린 현상을 이야기한다. (돼왕: input 이 쌓여 buffer 가 필요한 상황)
이에 대한 해결방법은 여러가지가 있겠다.

 

-
아래와 같은 기본 로직이 있다고 하자.
이 로직은 emit 은 1초 단위로 되는데, collect 는 3초 이상을 소비한다.
즉 back pressure 가 발생하는 것이다.

var start = 0L
fun currTime() = System.currentTimeMillis()
fun emitter(): Flow = (1..5)
	.asFlow()
	.onStart{ start = currTime() }
	.onEach{
		delay(1000)
		print("Emit $it (${currTime() - start}ms)")
	}

fun main() = runBlocking{
	val time = measureTimeMillis{
		emitter()
			.collect{
				print("\nCollect $it starts (${currTime() - start}ms) ")
				delay(3000)
				print("\nCollect $it ends (${currTime() - start}ms) ")
			}
	}
	print("\nCollected in $time ms")
}

해당 로직을 그대로 수행하면,

emit -> collect -> emit -> collect -> ... 가 되기 때문에

총 delay 1000 + 3000 = 4000ms
1..5 이기 때문에 총 5회
4초 * 5회 = 약 20초가 걸린다. ( print 시간 등이 있으니 20초가 조금 넘을 것이다. )

 

-
back pressure 를 해결하는 한 가지 방법은 emit 을 thread 에서 처리하는 것이다.

emitter()
	.flowOn(Dispatchers.Default)
	.collect{
		...
	}

이렇게 하면 emit 하는 thread 가 Default dispatcher 를 사용하게 되고, ( flowOn 이 호출되기 전의 로직들이 해당 dispatcher 에서 동작한다. )

collect 하는 thread 는 main 을 사용하게 되기 때문에 emit 에서 소요되는 시간이 줄어든다.
그러나 여전히 collect 는 순차적으로 수행하기 때문에 시간이 오래 걸린다.
게다가 emit 하는 thread 와 collect 하는 thread 의 차이가 발생한다는 단점도 있다.

 

-
collect 안에서 delay (suspend 함수)를 사용하기 때문에 buffer 를 사용하는 방법도 있겠다.

emitter()
	.buffer()
	.collect{
		...
	}

이렇게 하면 collect 안에서 delay 하는 순간 emitter 에서는 계속 emit 을 하여 buffer 에 값들을 쌓아놓을 수 있다.

이는 emit 하는 thread 와 collect 하는 thread 간의 thread 가 동일하다는 장점이 있다.
하지만 만약 collect 자체가 suspending function 을 사용하지 않고 단순 오래 걸리는 경우라면 해결책이 될 수 없다.

 

-
발행하는 모든 값을 처리할 필요가 없다면 conflation 을 사용하는 방법도 있다.

emitter()
	.conflation()
	.collect{
		...
	}

이렇게 하면 collect 안에서 suspending function 이 호출되었을 때 emitter 는 계속 값을 발행하지만, collect 가 수용할 수 없는 상태라면 최신값을 제외하고 모든 값을 버린다.
다시 말하면, collect 는 처리할 수 있는 최신 값만을 처리한다고 보면 된다.

 

 

참고 : https://medium.com/@davidecerbo/backpressure-in-kotlin-flows-9324d86c964e

 

 

반응형

댓글