본문 바로가기
프로그래밍 놀이터/Kotlin, Coroutine

[coroutine] Asyncronous Flow

by 돼지왕 왕돼지 2020. 3. 11.
반응형



Representing multiple values


-

복수개의 값은 kotlin 에서 collection 을 사용하여 표현할 수 있다.

fun foo() : List<Int> = listOf(1,2,3)

fun main() {
    foo.forEach { value -> println(value) }
}

// 1
// 2
// 3


* Sequences


-

어떤 숫자를 계산하는데 CPU 를 blocking 하는 code 를 사용하고, 그 과정에 각각 100ms 가 걸린다면 우리는 이 숫자들을 Sequence 로 표현할 수 있다.

fun foo() : Sequence<Int> = sequence{ // this 는 SequenceScope 이다. Sequence 는 coroutine 이 아닌 kotlin 의 것!
    for ( i in 1..3 ){
        Thread.sleep(100) // 계산하는 척하는 코드. coroutineScope 이 아니라 delay 는 사용 불가능하다.
        yield(i) // return value to iterator & and suspend until next value requested. suspend func 이다.
    }
}

fun main(){
    foo().forEach{ value -> println(value) }
}

// 1
// 2
// 3

결과는 동일하지만 각 출력 사이에 100ms delay 가 있다.



* Suspending functions


-

계산하는 로직은 main thread 에서 돌게 된다.

이 값들을 async 로 계산하려면, foo 함수에 suspend modifier 를 붙여야 한다.

그래서 blocking 없이 값들을 return 할 수 있게 해야 한다.

suspend fun foo(): List<Int> {
    delay(1000)
    return listOf(1,2,3)
}

fun main() = runBlocking<Unit>{
    foo().forEach { value -> println(value) }
}

// 1
// 2
// 3

결과는 같지만 각 결과를 1초 후에 찍는다.



* Flows


-

List<Int> result type 을 사용한다는 것은 값들을 한번에 발행한다는 이야기이다.

value 를 stream 으로 표현하면서 async 로 계산하려면 우리는 Flow<Int> 를 사용해야 한다.

sync 계산 값인 Sequence<Int> 를 사용했던 것처럼..

fun foo() : Flow<Int> = flow{ // this 는 FlowCollector, interface 에 emit 함수 하나만 정의되어 있다.
    for( i in 1..3){
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit>{
    launch{
        for(k in 1..3){
            println("I'm not blocked $k")
            delay(100)
        }
    }
    foo().collect { value -> println(value) } // collect 는 suspend function 이다.
}

// I'm not blocked 1
// 1
// I'm not blocked 2
// 2
// I'm not blocked 3
// 3

앞의 예제와 Flow 의 다른 점을 주목해보자.

    Flow type 에 대한 builder function 을 "flow" 이다.

    flow 안의 코드는 suspend 될 수 있다.

    foo() 함수는 더 이상 suspend modifier 로 마킹할 필요가 없다.

    "emit" 함수를 통해 값이 emit 된다.

    "collect" 함수를 통해 값이 collect 된다.





Flows are cold


-

Flow 는 sequence 와 비슷하게 cold stream 이다.

flow builder 는 flow 가 collected 되기 전까지 불리지 않는다.

fun foo() : Flow<Int> = flow{
    println("Flow Started")
    for(i in 1..3){
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit>{
    println("Calling foo...")
    val flow = foo()
    println("Calling collect...")
    flow.collect { value -> println(value) }
    println("Calling collect again...")
    flow.collect { value -> println(value) }
}

// Calling foo...
// Calling collect...
// Flow Started
// 1
// 2
// 3
// Calling collect again...
// Flow Started
// 1
// 2
// 3

foo() 함수 자체는 빠르게 return 하고 아무것도 기다리지 않는다. 그래서 suspend 를 붙일 필요가 없는 것이다.

flow 는 collect 가 불릴때마다 다시 시작한다. 그래서 collect 를 다시 불렀을 때 "Flow started" 를 다시 볼 수 있는 것이다.





Flow cancellation


-

Flow 는 coroutine 의 취소와 일반적으로 협조한다.

flow 의 내부 구조는 추가적인 취소 포인트를 도입하지 않았다.

이것은 취소에 대해 완전히 투명하다.

일반적으로 flow collection 은 flow 가 취소 가능한 delay 같은 suspending function 을 불러 suspend 되었을 때만 취소될 수 있다. 그렇지 않으면 취소될 수 없다. ( 돼왕 : 결론은 flow 에 대한 취소는 suspending function 에 대한 취소만 가능하다는 말 )

fun foo() : Flow<Int> = flow{
    for(i in 1..3){
        delay(100)
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit>{
    withTimeoutOrNull(250) {
        foo().collect{ value -> println(value) }
    }
    println("Done")
}

// Emitting 1
// 1
// Emitting 2
// 2
// Done




Flow builders


-

flow{ ... } 은 가장 기본적인 녀석이다.

다음과 같이 더 쉬운 flow 정의도 있다.

    "flowOf" builder 를 사용하여 고정된 value set 을 emit 할 수 있다.

    여러가지 collection 과 sequence 가 .asFlow() extension function 을 통해 전환될 수 있다.

fun main() = runBlocking<Unit>{
    (1..3).asFlow().collect { value -> println(value) }
}

// 1
// 2
// 3




Intermediate flow operators


-

flow 는 operator 로 transform 될 수 있다.

intermediate(중간) operator 들은 upstream flow 를 받아 downstream flow 를 return 된다.

이 operator 들도 cold 이다.

해당 operator 들의 호출은 suspending function 이 아니다.

이것은 transformed flow 로 빠른 return 을 한다.



-

기본적인 operator 로는 map, filter 와 같은 친숙한 녀석들이 있다.

sequence 와의 중요한 차이점은 operator 함수 안의 block 은 suspending function 을 호출할 수 있다는 것이다.

suspend fun performRequest(request:Int):String{
    delay(1000)
    return "response $request"
}

fun main() = runBlocking<Unit>{
    (1..3).asFlow()
        .map { request -> performRequest(request) }
        .collect { response -> println( response ) }
}

// response 1
// response 2
// response 3


* Transform operator


-

flow 변환 operator 중에서 가장 일반적인 것은 "transform" 이라고 불린다.

map, filter 와 같은 간단한 변환을 수행할 수도 있고, 아주 복잡한 변환을 할 수도 있다.

transform operator 를 사용해서 우리는 임의의 값과 임의의 횟수를 emit 할 수 있다.

suspend fun performRequest(request:Int):String{
    delay(1000)
    return "response $request"
}

fun main() = runBlocking<Unit>{
    (1..3).asFlow()
        .transform { request ->
            emit("Making request $request")
            emit(performRequest(request))
        }
        .collect { response -> println( response ) }
}

// Making request 1
// respone 1
// Making request 2
// respone 2
// Making request 3
// respone 3


* Size-limiting operators


-

"take" 와 같은 size 제한 중간 operator 들은 명시된 제한에 도달하면 flow 의 수행을 cancel 시킨다.

coroutine 의 취소는 언제나 exception 을 던지는 것으로 되기 때문에 try{ ... } finally { ... } 와 같은 res 관리 코드들이 잘 들어가야 한다.

fun numbers(): Flow<Int> = flow {
    try{
        emit(1) // emit 은 suspend function 이다.
        emit(2)
        println("This line will not execute")
        emit(3)
    } finally{
        println("Finally in numbers")
    }
    // 취소시 AbortFlowException 가 발생하며 이는 CancellationException 의 subclass 이다.
}

fun main() = runBlocking<Unit> {
    numbers()
        .take(2)
        .collect { value -> println(value) }
}

// 1
// 2
// Finally in numbers

돼왕 : 결과를 보아하니 limit opreator 가 있는 경우 emit 함수 안에서 값 발행 후 limit 조건이 충족하면 AbortFlowException 을 던지는 것으로 보인다. 그래서 This line will not execute 가 불리지 않은 것.





Terminal flow operators


-

Terminal operator 는 suspending function 으로 flow 를 수행시킨다.

"collect" operator 가 가장 기본적인 녀석이며, 다른 terminal operator 들도 있다.

    "toList", "toSet" 과 같은 collection 전환 operator

    "first" 를 통해 첫번째 값만 취할 수도 있고, "single" 을 통해 유일한 값을 취할 수도 있다.

    "reduce" 나 "fold" 를 통해 flow 를 reducing 할 수도 있다.

fun main = runBlocking<Unit>{
    val sum = (1..5).asFlow()
        .map { it * it }
        .reduce { a, b -> a + b }
    println(sum)
}

// 55




Flows are sequential


-

각각의 flow collection 은 특별한 operator 를 명시하지 않는 한 순차적으로 수행된다.

collection 작업은 terminal operator 가 수행되는 순간 coroutine 속에서 수행된다.

기본적으로는 새로운 coroutine 이 launch 되지 않는다.

각각의 값 emit 은 intermediate operator 에서 upstream 을 받아 downstream 으로 전환하며, 이것이 terminal operator 에 최종적으로 전달된다.

fun main() = runBlocking<Unit>{
    (1..5).asFlow()
        .filter{
            println("Filter $it")
            it %2 == 0
        }
        .map{
            println("Map $i")
            "string $it"
        }
        .collect{
            println("Collect $it")
        }
}

// Filter 1
// Filter 2
// Map 2
// Collect string 2
// Filter 3
// Filter 4
// Map 4
// Collect string 4
// Filter 5







Flow context


-

flow 의 collection 은 항상 호출하는 coroutine 의 context 에서 수행된다.

withContext(context){
    foo.collect { value ->
        println(value) // 명시된 context 에서 수행된다.
    }
}

이런 flow 의 성질을 "context preservation(context 보존)" 이라 부른다.



-

기본적으로 flow { ... } builder 안의 코드는 전달받은 context 에서 수행된다.

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun foo(): Flow<Int> = flow{
    log("Started foo flow")
    for (i in 1..3) {
        emit(i)
    }
}

fun main() = runBlocking<Unit>{
    foo().collect { value -> log("Collected $value") }
}

// [main @coroutine#1] Started foo flow
// [main @coroutine#1] Collected 1
// [main @coroutine#1] Collected 2
// [main @coroutine#1] Collected 3

foo().collect 가 main thread 에서 불렸기 때문에, foo 의 body 역시 main thread 에서 불린다.

이것이 빠르게 실행하고, caller 를 block 하지 않으면서도 수행 context 를 신경쓰지 않는 가장 완벽한 기본이다.



* Wrong emission withContext


-

CPU 를 소비하는 코드 오래 동작하는 코드들은 Dispatchers.Default context 에서 실행되야 할 수 있으며, UI-updating code 는 Dispatchers.Main 에서 수행되어야 한다.

일반적으로 kotlin coroutine 에서는 withContext 가 context change 를 위해 사용된다.

그러나 flow { ... } builder 는 context preservation 속성을 존중해야 하므로 다른 context 에서 emit 하는 것을 허락하지 않는다.

fun foo(): Flow<Int> = flow{
    withContext(Dispatchers.Default){
        for (i in 1..3) {
            Thread.sleep(100)
            emit(i)
    }
}

fun main() = runBlocking<Unit>{
    foo().collect { value -> println(value) }
}

Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:

                Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],

                but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, DefaultDispatcher].

                Please refer to 'flow' documentation or use 'flowOn' instead

        at ...



* flowOn operator


-

예외에서 알려준데로, flowOn 함수가 emit 하는 context 를 변경할 수 있다.

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun foo(): Flow<Int> = flow{
    for (i in 1..3) {
        Thread.sleep(100)
        log("Emitting $i")
        emit(i)
    }
}.flowOn(Dispatchers.Default)

fun main() = runBlocking<Unit>{
    foo().collect { value -> log("Collected $value") }
}

// [DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
// [main @coroutine#1] Collected 1
// [DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
// [main @coroutine#1] Collected 2
// [DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
// [main @coroutine#1] Collected 3

collection 은 main thread 에서 수행되지만, flow { ... } 가 bg thread 에서 작동함을 눈여겨보자.

또 하나 눈여겨 볼 것은 flowOn operator 가 순차적인 수행도 변경했다는 것이다. ( 돼왕 : coroutine#1, #2 순서가 아니다. )

collection 은 coroutine#1 에서 수행되었지만, emit 은 coroutine #2 에서 수행되었다.

flowOn 이 새로운 upstream flow 를 위해 제공된 CourinteDispatcher를 사용하여 새로운 coroutine 을 만든 것이다.





Buffering


-

flow 의 다른 파트들을 다른 coroutine 에서 돌리는 것은 flow 를 collect 하는데 걸리는 전체 시간을 줄이는 측면에서 유용하다. 

특히 오래 걸리는 async operation 일 경우 그렇다.

fun foo(): Flow<Int> = flow{
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit>{
    val time = measureTimeMillis {
        foo().collect { value ->
            delay(300)
            println(value)
        }
    }
    println("Collected in $time ms")
}

// 1
// 2
// 3
// Collected in 1220ms

각각의 숫자를 출력하는데 400ms 정도를 사용했다.

( 돼왕 : main thread 하나로만 동작하기 때문에 collect 가 다음 숫자를 요청할 때까지 flow 의 다음 코드는 돌지 않았다. emit 에서 멈춰있었다고 보면 되겠다. )


flow 에 buffer 를 적용함으로써 병렬적으로 value 를 collect 할 수 있다.

fun foo(): Flow<Int> = flow{
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit>{
    val time = measureTimeMillis {
        foo()
            .buffer()
            .collect { value ->
                delay(300)
                println(value)
        }
    }
    println("Collected in $time ms")
}

// 1
// 2
// 3
// Collected in 1071 ms

collect 에서는 실제로는 첫번째 100ms 만 기다리고, 다른 각각의 숫자를 처리하는데 300ms 만 썼다.

( 돼왕 : 앞선 예제에서는 collect 가 다음 요청을 할 때까지 emit 에서 멈춰있었다면, buffer 가 적용된 후에는 emit 에서 멈춰 있지 않고, flow logic 이 계속 돌아서 buffer 에 계속 값을 쌓아놓았다고 볼 수 있다. )


눈여겨 볼 것은 flowOn operator 도 CoroutineDispatcher 를 변경해야만 할 때 동일한 buffering mechanism 을 사용한다.

여기서는 context change 없이 하기 위해 buffering 을 사용했다고 볼 수 있다.



* Conflation


-

flow 가 부분적인 결과를 보여줄 필요가 있거나, 작업의 현재 상태만 update 하는 것이 필요하다면, 각각의 값을 처리할 필요가 없다.

대신 가장 최신의 것만 유지하면 된다. collector 가 값을 처리하는게 너무 느릴 때도 유효할 수 있다.

이 때는 "conflate" operator 를 사용해서 중간값들을 생략할 수 있다.

fun foo(): Flow<Int> = flow{
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit>{
    val time = measureTimeMillis {
        foo()
            .conflate()
            .collect { value ->
                delay(300)
                println(value)
        }
    }
    println("Collected in $time ms")
}

// 1
// 3
// Collected in 758ms

첫번째 숫자가 처리되는 동안, 세번째 숫자가 생성되었다. 그래서 두번째 숫자가 conflated 되었고, 가장 최근 값인 세번째 값이 collector 에 전달되었다.

돼왕 : Conflate 는 '융합(합체)하다'라는 뜻이다.



* Processing the latest value


-

Conflation 은 emitter 와 collector 모두가 느릴 때 처리를 빠르게 하는 방법 중 하나이다.

중간의 emit 된 value 를 drop 시킨다.

다른 방법은 느린 collector 를 취소시키고 새로운 값이 emit 될 때마다 restart 시키는 방법이다.

xxxLatest 라는 operator 시리즈가 있는데, xxx 의 기본 로직을 수행하지만, 새로운 value 에 대한 block 이 있다면 cancel 을 시켜버린다.

fun foo(): Flow<Int> = flow{
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit>{
    val time = measureTimeMillis {
        foo()
            .collectLatest { value ->
                println("Collecting $value")
                delay(300)
                println("Done $value")
        }
    }
    println("Collected in $time ms")
}

// Collecting  1
// Collecting  2
// Collecting  3
// Done 3
// Collected in 741ms




Composing multiple flows


* Zip


-

Sequence.zip 과 같이 flows 도 zip 을 가지고 있다.

fun main() = runBlocking<Unit> {
    val nums = (1..3).asFlow()
    val strs = flowOf("one", "two", "three")
    nums.zip(strs) { a, b -> "$a -> $b" }
        .collect { println( it ) }
}

// 1 -> one
// 2 -> two
// 3 -> three


* Combine


-

flow 가 가장 최신 값을 표시하는 경우, 가장 최근 값을 바탕으로 계산이 이루어져야 할 수 있다.

이것을 위한 것이 combine 이다.


예를 들어 한 숫자가 매 300ms 마다 업데이트가 되고 string 이 400ms 마다 업데이트가 된다면,

이들을 묶는 것은 zip 은 여전히 동일한 결과를 낸다. 다만, 매 400ms 마다 값이 출력된다는 차이만 있다.

fun main() = runBlocking<Unit> {
    val nums = (1..3).asFlow().onEach { delay(300) }
    val strs = flowOf("one", "two", "three").onEach { delay(400) }
    val startTime = System.currentTimeMillis()
    nums.zip(strs) { a, b -> "$a -> $b" }
        .collect { value ->
            println( "$value at ${System.currentTimeInMillis() - startTime} ms from start" ) 
    }
}

// 1 -> one at 410 ms from start
// 2 -> two at 820 ms from start
// 3 -> three at 1230 ms from start

하지만 여기서 zip 대신 combine 으로 바꾼다면 결과가 이렇게 달라진다.

// 1 -> one at 452 ms from start
// 2 -> one at 651 ms from start
// 2 -> two at 854 ms from start
// 3 -> two at 952 ms from start
// 3 -> three at 1256 ms from start




Flattening flows


-

flow 는 sequence of value 를 async 로 받을 수 있는데, 이는 각각의 값이 다른 sequence of value 를 return 하는 경우를 아주 쉬운 예로 들 수 있다. ( 돼왕 : flow 가 flow 를 value 로 emit 할 수 있다는 의미이다. )

fun requestFlow(i:Int) : Flow<String> = flow {
    emit("$i: First")
    delay(500)
    emit("$i: Second")
}

(1..3).asFlow().map{ requestFlow(it) }

이렇게 하면 Flow<Flow<String>> 이 반환되는데, single flow 로 flatten (평평하게 하다.) 할 필요가 있을 수 있다.

Collection 과 sequence 는 flatten 과 flatMap operator 가 있다.

하지만 flow 의 async 의 특성상 flatten 은 추가적인 모드를 지원한다.



* flatMapConcat


-

flatMapConcat, flattenConcat operator 를 통해서 concat 을 할 수 있다.

sequence 에 대한 operator 와 아주 비슷하다.

이들은 collect 를 하기 전에 inner flow 의 결과를 기다린다.

fun requestFlow(i:Int) : Flow<String> = flow {
    emit("$i: First")
    delay(500)
    emit("$i: Second")
}

fun main() = runBlocking<Unit> {
    val startTime = System.currentTimeMillis()
    (1..3).asFlow().onEach { delay(100) }
        .flatMapConcat { requestFlow(it) }
        .collect { value ->
            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
        }
}

// 1: First at 121 ms from start
// 1: Second at 622 ms from start
// 2: First at 727 ms from start
// 2: Second at 1227 ms from start
// 3: First at 1328 ms from start
// 3: Second at 1829 ms from start


* flatMapMerge


-

flatten 의 다른 모드는 incoming flow 를 병렬적으로 모은 후에 single flow 로 머지하는 것이다.

그래서 value 가 바로바로 emit 될 수 있도록 한다.

이는 flatMapMerge 와 flattenMerge opreator 를 통해 수행된다.

이들은 optional 한 concurrency param 을 받는데, 이것이 동시에 돌 수 있는 concurrent flow 갯수를 limit 을 결정한다.

DEFAULT_CONCURRENCY 가 기본값이다.

fun requestFlow(i:Int) : Flow<String> = flow {
    emit("$i: First")
    delay(500)
    emit("$i: Second")
}

fun main() = runBlocking<Unit> {
    val startTime = System.currentTimeMillis()
    (1..3).asFlow().onEach { delay(100) }
        .flatMapMerge { requestFlow(it) }
        .collect { value ->
            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
        }
}

// 1: First at 136 ms from start
// 2: First at 231 ms from start
// 3: First at 333 ms from start
// 1: Second at 639 ms from start
// 2: Second at 732 ms from start
// 3: Second at 833 ms from start

flatMapMerge 는 { requestFlow(it) } 코드를 순차적으로 불렀지만, 결과가 되는 flow 는 병렬적으로 값을 내뱉었다.

이는 map 을 먼저 부르고 flattenMerge 를 그 다음에 부르는 것과 동일하다.



* flatMapLatest


-

collectLatest operator 와 비슷하게 Latest 로 flattening 하는 mode 가 있다.

이 녀석은 collector 가 처리가 느릴 경우 cancel 시키고 새 값을 emit 시킨다.

fun requestFlow(i:Int) : Flow<String> = flow {
    emit("$i: First")
    delay(500)
    emit("$i: Second")
}

fun main() = runBlocking<Unit> {
    val startTime = System.currentTimeMillis()
    (1..3).asFlow().onEach { delay(100) }
        .flatMapLatest { requestFlow(it) }
        .collect { value ->
            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
        }
}

// 1: First at 139 ms from start
// 2: First at 247 ms from start
// 3: First at 350 ms from start
// 3: Second at 850 ms from start

flatMapLatest 는 새로운 value 가 발행되었을 때 { requestFlow(it) } 블락의 코드를 취소시킨다.

requestFlow 자체가 빠르고 suspending 이 아니라 취소될 수 없었다면 flatMapLatest 호출이 아무런 차이를 못 만들어냈을 것이다.








Flow exceptions


-

flow collection 이 emitter 나 operator 안의 코드로 인해 exception 으로 끝날 수 있다.



* Collector try and catch

fun foo(): Flow<Int> = flow {
    for ( i in 1..3 ){
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    try{
        foo().collect { value ->
            println(value)
            check(value <= 1) { "Collected $value" } // check 는 조건이 맞지 않으면 IllegalStateException 을 내는 kotlin top level function
        }
    } catch (e:Throwable){
        println("Caught $e")
    }
}

// Emitting 1
// 1
// Emitting 2
// 2
// Caught java.lang.IllegalStateException: Collected 2

성공적으로 caught 되었고, 더 이상의 값이 emit 되지 않았다.



* Everything is caught


-

앞의 예제는 emitter, intermediate, terminal operator 에서 발생한 모든 exception 을 catch 한다.

다음과 같이 코드를 짜면 약간 다른 코드를 볼 수 있다.

fun foo(): Flow<String> = flow{
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}.map { value ->
    check(value <= 1) { "Crashed on $value" }
    "string $value"
}

fun main() = runBlocking<Unit> {
    try{
        foo().collect { value -> println(value) }
    } catch (e:Throwable) {
        println("Caught $e")
    }
}

// Emitting 1
// string 1
// Emitting 2
// Caught java.lang.IllegalStateException: Crashed on 2

역시나 발행을 멈췄다.





Exception transparency


-

emitter 의 코드가 어떻게 예외 처리 로직을 encapsulate 할 수 있을까?

Flows 는 반드시 exception 에 대해 투명해야 한다.

그리고 flow { ... } builder code 안의 try/catch block 에서 value 를 emit 하는 것은 exception transparency 에 위배된다.

투명한 예외는 collector 가 항상 catch 로 exception 을 잡을 수 있음을 보장한다.


emitter 가 exception transparency 를 위해 catch operator 를 사용할 수 있으며, 이를 통해 예외 처리를 은닉할 수 있다.

catch operator 코드에는 exception 을 분석한 후 다른 방식으로 반응하는 로직이 들어가 있다.

    Exception 이 throw 를 통해 다시 던져질 수 있다.

    Exception 이 emit 을 이용해서 값의 발행으로 이어질 수 있다.

    Exception 이 무시되거나, 로그를 찍거나, 다른 코드에 의해 처리될 수 있다.

fun foo(): Flow<String> = flow{
    for(i in 1..3){
        println("Emitting $i")
        emit(i)
    }
}
.map { value ->
    check(value <= 1) { "Crashed on $value" }
    "string $value"
}

fun main() = runBlocking<Unit>{
    foo()
        .catch { e -> emit("Caught $e") }
        .collect { value -> println(value) }
}

// Emitting 1
// string 1
// Emitting 2
// Caught java.lang.IllegalStateException: Crashed on 2


* Transparent catch


-

"catch" operator 는 예외 투명성을 존중하며, upstream 의 exception 을 catch 한다

만약 catch 보다 아래 호출된 collect { ... } 에서 exception 이 나면 catch 는 이를 잡지 못한다.



* Catching declaratively


-

"catch" operator 의 선언적 특성을 모든 exception 처리에 대한 욕구과 조합시킬 수 있다.

이는 collect operator 의 body 를 onEach 에 위치시키고, onEach 를 catch 보다 먼저 호출하는 것이다.

fun foo(): Flow<String> = flow{
    for(i in 1..3){
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    foo()
        .onEach { value ->
            check(value <= 1) { "Collected $value" }
            println(value)
        }
        .catch { e -> println("Caught $e") }
        .collect()
}




Flow completion


-

flow collection 이 끝났을 때 (정상적으로 또는 예외적으로) 특정 action 을 수행하고 싶을 수 있다.

이는 명령적 방법과 선언적 방법 두가지 방법으로 수행할 수 있다.



* Imperative finally block


-

try/catch 를 추가하는 대신 collector 는 finally block 을 이용해서 collect 가 끝난 후 액션을 취할 수 있다.

fun foo(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    try {
        foo().collect { value -> println(value) }
    } finally {
        println("Done")
    }
}            

// 1
// 2
// 3
// Done


* Declarative handling


-

"onCompletion" intermediate operator 를 사용하여 collect 가 끝났을 때 action 을 취할 수 있다.

fun foo(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    foo()
        .onCompletion { println("Done") }
        .collect { value -> println(value) }
}            

// 1
// 2
// 3
// Done

onCompletion 의 강정은 lambda 의 Throwable parameter 가 nullable 이라는 것이다.

그래서 flow collection 이 정상적으로 끝났는지, 예외적으로 끝났는지를 알 수 있다.

fun foo(): Flow<Int> = flow {
    emit(1)
    throw RuntimeException()
}

fun main() = runBlocking<Unit> {
    foo()
        .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
        .catch { cause -> println("Caught exception") }
        .collect { value -> println(value) }
}

// 1
// Flow completed exceptionally
// Caught exception

onComplete operator 는 catch 와 달리 exception 을 다루지 않는다.

발생한 exception 은 그대로 downstream 으로 전달된다.



* Upstream exceptions only


-

catch operator 와 같이 onCompletion 은 exception 이 upstream 으로부터 오는 것을 볼 수 있고, downstream 에 대해서는 볼 수 없다.

fun foo(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    foo()
        .onCompletion { cause -> println("Flow completed with $cause") }
        .collect { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
}

// 1
// Flow completed with null
// Exception in thread "main" java.lang.IllegalStateException: Collected 2


* Imperative versus declarative


-

imperative 와 declarative 중 어떤 접근법이 좋은가? 그리고 그 이유는 무엇인가?

library 로서 어떤 접근법이 우세하다고 지지하기 어렵다. 그저 본인의 스타일에 맞게 잘 선택해 쓰시길...





Launching flow


-

flow 로 특정 source 로부터 오는 async event 를 표현하는 것은 쉽다.

이는 addEventListener 와 비슷하게 code 를 등록하고, event 가 왔을 때 작업을 수행하면 된다.

onEach operator 가 이 역할을 한다.

하지만 onEach 는 intermediate operator 이다.

우리는 flow 를 collect 하기 위해 terminal operator 도 필요하다.

fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .collect()
    println("Done")
}

// Event: 1
// Event: 2
// Event: 3
// Done

"launchIn" terminal operator 는 훨씬 간편하게 이 작업을 할 수 있다.

collect 대신 launchIn 을 사용하면, 각각의 flow collection 을 다른 coroutine 에서 처리할 수 있다.

fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

//sampleStart
fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .launchIn(this)
    println("Done")
}

// Done
// Event: 1
// Event: 2
// Event: 3

launchIn 에 요구되는 param 은 반드시 flow 를 collect 할 coroutine 이 launch 될 CoroutineScope 를 명시해야 한다.

위의 예제에서는 scope 는 runBlocking 에서부터 왔다. 그래서 flow 가 도는동안 runBlocking scope 은 child coroutine 이 종료될 때까지 기다린 후 main function 이 종료된다.


실제 앱에서 scope 은 보통 lifetime 이 한정적인 녀석으로 전달된다.

lifetime 이 종료될 때 해당 scope 도 취소되고, 관련된 flow의 collection 도 취소된다.

onEach{ ... }.launchIn(scope) 조합이 addEventListener 처럼 작동한다.

하지만 removeEventListener 가 필요가 없게 된다. 왜냐하면 structured concurrency 로 인해 자동으로 취소되기 때문이다.



-

launchIn 은 Job 을 return 하는 것을 눈여겨보자.

이 job 은 전체 scope 을 취소하는 대신 단일 job 을 취소되는 데 사용될 수 있고, join 하는 데도 사용될 수 있다.





Flow and Reactive Streams


-

RxJava 와 Project Reactor 등의 Reactive Streams 이나 framework 에 친숙하다면 Flow 도 이와 비슷하다는 것을 느꼈을 것이다.

사실 flow 의 design 은 Reactive Stream 과 그 구현에서 영감을 받았다.

그러나 Flow 의 주 목적은 되도록 간단하게 만드는 것이다.

Kotlin 으로 되어야 하고, suspension 과 잘 조화가 되어야 하며, structured concurrency 도 적용되도록 하는 것이었다.

이는 reactive 선구자들과 그들의 업적 없이는 불가능했을 것이다.



-

다르긴 하지만 개념적으로 Flow 는 reactive stream 이고, reactive Publisher 로 상호 전환될 수 있다.

해당 converter 는 kotlinx.coroutines 바깥에서 제공되며, kotlinx-coroutines-reactive 는 Reactive stream 용, kotlinx-coroutines-reactor 는 Project Reactor 용, 그리고 kotlinx-coroutines-rx2 는 RxJava2 를 위해 제공된다.




참고자료

https://github.com/Kotlin/kotlinx.coroutines/blob/master/docs/flow.md#asynchronous-flow




반응형

댓글