Representing multiple values
-
복수개의 값은 kotlin 에서 collection 을 사용하여 표현할 수 있다.
fun foo() : List<Int> = listOf(1,2,3) fun main() { foo.forEach { value -> println(value) } } // 1 // 2 // 3
* Sequences
-
어떤 숫자를 계산하는데 CPU 를 blocking 하는 code 를 사용하고, 그 과정에 각각 100ms 가 걸린다면 우리는 이 숫자들을 Sequence 로 표현할 수 있다.
fun foo() : Sequence<Int> = sequence{ // this 는 SequenceScope 이다. Sequence 는 coroutine 이 아닌 kotlin 의 것! for ( i in 1..3 ){ Thread.sleep(100) // 계산하는 척하는 코드. coroutineScope 이 아니라 delay 는 사용 불가능하다. yield(i) // return value to iterator & and suspend until next value requested. suspend func 이다. } } fun main(){ foo().forEach{ value -> println(value) } } // 1 // 2 // 3
결과는 동일하지만 각 출력 사이에 100ms delay 가 있다.
* Suspending functions
-
계산하는 로직은 main thread 에서 돌게 된다.
이 값들을 async 로 계산하려면, foo 함수에 suspend modifier 를 붙여야 한다.
그래서 blocking 없이 값들을 return 할 수 있게 해야 한다.
suspend fun foo(): List<Int> { delay(1000) return listOf(1,2,3) } fun main() = runBlocking<Unit>{ foo().forEach { value -> println(value) } } // 1 // 2 // 3
결과는 같지만 각 결과를 1초 후에 찍는다.
* Flows
-
List<Int> result type 을 사용한다는 것은 값들을 한번에 발행한다는 이야기이다.
value 를 stream 으로 표현하면서 async 로 계산하려면 우리는 Flow<Int> 를 사용해야 한다.
sync 계산 값인 Sequence<Int> 를 사용했던 것처럼..
fun foo() : Flow<Int> = flow{ // this 는 FlowCollector, interface 에 emit 함수 하나만 정의되어 있다. for( i in 1..3){ delay(100) emit(i) } } fun main() = runBlocking<Unit>{ launch{ for(k in 1..3){ println("I'm not blocked $k") delay(100) } } foo().collect { value -> println(value) } // collect 는 suspend function 이다. } // I'm not blocked 1 // 1 // I'm not blocked 2 // 2 // I'm not blocked 3 // 3
앞의 예제와 Flow 의 다른 점을 주목해보자.
Flow type 에 대한 builder function 을 "flow" 이다.
flow 안의 코드는 suspend 될 수 있다.
foo() 함수는 더 이상 suspend modifier 로 마킹할 필요가 없다.
"emit" 함수를 통해 값이 emit 된다.
"collect" 함수를 통해 값이 collect 된다.
Flows are cold
-
Flow 는 sequence 와 비슷하게 cold stream 이다.
flow builder 는 flow 가 collected 되기 전까지 불리지 않는다.
fun foo() : Flow<Int> = flow{ println("Flow Started") for(i in 1..3){ delay(100) emit(i) } } fun main() = runBlocking<Unit>{ println("Calling foo...") val flow = foo() println("Calling collect...") flow.collect { value -> println(value) } println("Calling collect again...") flow.collect { value -> println(value) } } // Calling foo... // Calling collect... // Flow Started // 1 // 2 // 3 // Calling collect again... // Flow Started // 1 // 2 // 3
foo() 함수 자체는 빠르게 return 하고 아무것도 기다리지 않는다. 그래서 suspend 를 붙일 필요가 없는 것이다.
flow 는 collect 가 불릴때마다 다시 시작한다. 그래서 collect 를 다시 불렀을 때 "Flow started" 를 다시 볼 수 있는 것이다.
Flow cancellation
-
Flow 는 coroutine 의 취소와 일반적으로 협조한다.
flow 의 내부 구조는 추가적인 취소 포인트를 도입하지 않았다.
이것은 취소에 대해 완전히 투명하다.
일반적으로 flow collection 은 flow 가 취소 가능한 delay 같은 suspending function 을 불러 suspend 되었을 때만 취소될 수 있다. 그렇지 않으면 취소될 수 없다. ( 돼왕 : 결론은 flow 에 대한 취소는 suspending function 에 대한 취소만 가능하다는 말 )
fun foo() : Flow<Int> = flow{ for(i in 1..3){ delay(100) println("Emitting $i") emit(i) } } fun main() = runBlocking<Unit>{ withTimeoutOrNull(250) { foo().collect{ value -> println(value) } } println("Done") } // Emitting 1 // 1 // Emitting 2 // 2 // Done
Flow builders
-
flow{ ... } 은 가장 기본적인 녀석이다.
다음과 같이 더 쉬운 flow 정의도 있다.
"flowOf" builder 를 사용하여 고정된 value set 을 emit 할 수 있다.
여러가지 collection 과 sequence 가 .asFlow() extension function 을 통해 전환될 수 있다.
fun main() = runBlocking<Unit>{ (1..3).asFlow().collect { value -> println(value) } } // 1 // 2 // 3
Intermediate flow operators
-
flow 는 operator 로 transform 될 수 있다.
intermediate(중간) operator 들은 upstream flow 를 받아 downstream flow 를 return 된다.
이 operator 들도 cold 이다.
해당 operator 들의 호출은 suspending function 이 아니다.
이것은 transformed flow 로 빠른 return 을 한다.
-
기본적인 operator 로는 map, filter 와 같은 친숙한 녀석들이 있다.
sequence 와의 중요한 차이점은 operator 함수 안의 block 은 suspending function 을 호출할 수 있다는 것이다.
suspend fun performRequest(request:Int):String{ delay(1000) return "response $request" } fun main() = runBlocking<Unit>{ (1..3).asFlow() .map { request -> performRequest(request) } .collect { response -> println( response ) } } // response 1 // response 2 // response 3
* Transform operator
-
flow 변환 operator 중에서 가장 일반적인 것은 "transform" 이라고 불린다.
map, filter 와 같은 간단한 변환을 수행할 수도 있고, 아주 복잡한 변환을 할 수도 있다.
transform operator 를 사용해서 우리는 임의의 값과 임의의 횟수를 emit 할 수 있다.
suspend fun performRequest(request:Int):String{ delay(1000) return "response $request" } fun main() = runBlocking<Unit>{ (1..3).asFlow() .transform { request -> emit("Making request $request") emit(performRequest(request)) } .collect { response -> println( response ) } } // Making request 1 // respone 1 // Making request 2 // respone 2 // Making request 3 // respone 3
* Size-limiting operators
-
"take" 와 같은 size 제한 중간 operator 들은 명시된 제한에 도달하면 flow 의 수행을 cancel 시킨다.
coroutine 의 취소는 언제나 exception 을 던지는 것으로 되기 때문에 try{ ... } finally { ... } 와 같은 res 관리 코드들이 잘 들어가야 한다.
fun numbers(): Flow<Int> = flow { try{ emit(1) // emit 은 suspend function 이다. emit(2) println("This line will not execute") emit(3) } finally{ println("Finally in numbers") } // 취소시 AbortFlowException 가 발생하며 이는 CancellationException 의 subclass 이다. } fun main() = runBlocking<Unit> { numbers() .take(2) .collect { value -> println(value) } } // 1 // 2 // Finally in numbers
돼왕 : 결과를 보아하니 limit opreator 가 있는 경우 emit 함수 안에서 값 발행 후 limit 조건이 충족하면 AbortFlowException 을 던지는 것으로 보인다. 그래서 This line will not execute 가 불리지 않은 것.
Terminal flow operators
-
Terminal operator 는 suspending function 으로 flow 를 수행시킨다.
"collect" operator 가 가장 기본적인 녀석이며, 다른 terminal operator 들도 있다.
"toList", "toSet" 과 같은 collection 전환 operator
"first" 를 통해 첫번째 값만 취할 수도 있고, "single" 을 통해 유일한 값을 취할 수도 있다.
"reduce" 나 "fold" 를 통해 flow 를 reducing 할 수도 있다.
fun main = runBlocking<Unit>{ val sum = (1..5).asFlow() .map { it * it } .reduce { a, b -> a + b } println(sum) } // 55
Flows are sequential
-
각각의 flow collection 은 특별한 operator 를 명시하지 않는 한 순차적으로 수행된다.
collection 작업은 terminal operator 가 수행되는 순간 coroutine 속에서 수행된다.
기본적으로는 새로운 coroutine 이 launch 되지 않는다.
각각의 값 emit 은 intermediate operator 에서 upstream 을 받아 downstream 으로 전환하며, 이것이 terminal operator 에 최종적으로 전달된다.
fun main() = runBlocking<Unit>{ (1..5).asFlow() .filter{ println("Filter $it") it %2 == 0 } .map{ println("Map $i") "string $it" } .collect{ println("Collect $it") } } // Filter 1 // Filter 2 // Map 2 // Collect string 2 // Filter 3 // Filter 4 // Map 4 // Collect string 4 // Filter 5
Flow context
-
flow 의 collection 은 항상 호출하는 coroutine 의 context 에서 수행된다.
withContext(context){ foo.collect { value -> println(value) // 명시된 context 에서 수행된다. } }
이런 flow 의 성질을 "context preservation(context 보존)" 이라 부른다.
-
기본적으로 flow { ... } builder 안의 코드는 전달받은 context 에서 수행된다.
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg") fun foo(): Flow<Int> = flow{ log("Started foo flow") for (i in 1..3) { emit(i) } } fun main() = runBlocking<Unit>{ foo().collect { value -> log("Collected $value") } } // [main @coroutine#1] Started foo flow // [main @coroutine#1] Collected 1 // [main @coroutine#1] Collected 2 // [main @coroutine#1] Collected 3
foo().collect 가 main thread 에서 불렸기 때문에, foo 의 body 역시 main thread 에서 불린다.
이것이 빠르게 실행하고, caller 를 block 하지 않으면서도 수행 context 를 신경쓰지 않는 가장 완벽한 기본이다.
* Wrong emission withContext
-
CPU 를 소비하는 코드 오래 동작하는 코드들은 Dispatchers.Default context 에서 실행되야 할 수 있으며, UI-updating code 는 Dispatchers.Main 에서 수행되어야 한다.
일반적으로 kotlin coroutine 에서는 withContext 가 context change 를 위해 사용된다.
그러나 flow { ... } builder 는 context preservation 속성을 존중해야 하므로 다른 context 에서 emit 하는 것을 허락하지 않는다.
fun foo(): Flow<Int> = flow{ withContext(Dispatchers.Default){ for (i in 1..3) { Thread.sleep(100) emit(i) } } fun main() = runBlocking<Unit>{ foo().collect { value -> println(value) } }
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, DefaultDispatcher].
Please refer to 'flow' documentation or use 'flowOn' instead
at ...
* flowOn operator
-
예외에서 알려준데로, flowOn 함수가 emit 하는 context 를 변경할 수 있다.
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg") fun foo(): Flow<Int> = flow{ for (i in 1..3) { Thread.sleep(100) log("Emitting $i") emit(i) } }.flowOn(Dispatchers.Default) fun main() = runBlocking<Unit>{ foo().collect { value -> log("Collected $value") } } // [DefaultDispatcher-worker-1 @coroutine#2] Emitting 1 // [main @coroutine#1] Collected 1 // [DefaultDispatcher-worker-1 @coroutine#2] Emitting 2 // [main @coroutine#1] Collected 2 // [DefaultDispatcher-worker-1 @coroutine#2] Emitting 3 // [main @coroutine#1] Collected 3
collection 은 main thread 에서 수행되지만, flow { ... } 가 bg thread 에서 작동함을 눈여겨보자.
또 하나 눈여겨 볼 것은 flowOn operator 가 순차적인 수행도 변경했다는 것이다. ( 돼왕 : coroutine#1, #2 순서가 아니다. )
collection 은 coroutine#1 에서 수행되었지만, emit 은 coroutine #2 에서 수행되었다.
flowOn 이 새로운 upstream flow 를 위해 제공된 CourinteDispatcher를 사용하여 새로운 coroutine 을 만든 것이다.
Buffering
-
flow 의 다른 파트들을 다른 coroutine 에서 돌리는 것은 flow 를 collect 하는데 걸리는 전체 시간을 줄이는 측면에서 유용하다.
특히 오래 걸리는 async operation 일 경우 그렇다.
fun foo(): Flow<Int> = flow{ for (i in 1..3) { delay(100) emit(i) } } fun main() = runBlocking<Unit>{ val time = measureTimeMillis { foo().collect { value -> delay(300) println(value) } } println("Collected in $time ms") } // 1 // 2 // 3 // Collected in 1220ms
각각의 숫자를 출력하는데 400ms 정도를 사용했다.
( 돼왕 : main thread 하나로만 동작하기 때문에 collect 가 다음 숫자를 요청할 때까지 flow 의 다음 코드는 돌지 않았다. emit 에서 멈춰있었다고 보면 되겠다. )
flow 에 buffer 를 적용함으로써 병렬적으로 value 를 collect 할 수 있다.
fun foo(): Flow<Int> = flow{ for (i in 1..3) { delay(100) emit(i) } } fun main() = runBlocking<Unit>{ val time = measureTimeMillis { foo() .buffer() .collect { value -> delay(300) println(value) } } println("Collected in $time ms") } // 1 // 2 // 3 // Collected in 1071 ms
collect 에서는 실제로는 첫번째 100ms 만 기다리고, 다른 각각의 숫자를 처리하는데 300ms 만 썼다.
( 돼왕 : 앞선 예제에서는 collect 가 다음 요청을 할 때까지 emit 에서 멈춰있었다면, buffer 가 적용된 후에는 emit 에서 멈춰 있지 않고, flow logic 이 계속 돌아서 buffer 에 계속 값을 쌓아놓았다고 볼 수 있다. )
눈여겨 볼 것은 flowOn operator 도 CoroutineDispatcher 를 변경해야만 할 때 동일한 buffering mechanism 을 사용한다.
여기서는 context change 없이 하기 위해 buffering 을 사용했다고 볼 수 있다.
* Conflation
-
flow 가 부분적인 결과를 보여줄 필요가 있거나, 작업의 현재 상태만 update 하는 것이 필요하다면, 각각의 값을 처리할 필요가 없다.
대신 가장 최신의 것만 유지하면 된다. collector 가 값을 처리하는게 너무 느릴 때도 유효할 수 있다.
이 때는 "conflate" operator 를 사용해서 중간값들을 생략할 수 있다.
fun foo(): Flow<Int> = flow{ for (i in 1..3) { delay(100) emit(i) } } fun main() = runBlocking<Unit>{ val time = measureTimeMillis { foo() .conflate() .collect { value -> delay(300) println(value) } } println("Collected in $time ms") } // 1 // 3 // Collected in 758ms
첫번째 숫자가 처리되는 동안, 세번째 숫자가 생성되었다. 그래서 두번째 숫자가 conflated 되었고, 가장 최근 값인 세번째 값이 collector 에 전달되었다.
돼왕 : Conflate 는 '융합(합체)하다'라는 뜻이다.
* Processing the latest value
-
Conflation 은 emitter 와 collector 모두가 느릴 때 처리를 빠르게 하는 방법 중 하나이다.
중간의 emit 된 value 를 drop 시킨다.
다른 방법은 느린 collector 를 취소시키고 새로운 값이 emit 될 때마다 restart 시키는 방법이다.
xxxLatest 라는 operator 시리즈가 있는데, xxx 의 기본 로직을 수행하지만, 새로운 value 에 대한 block 이 있다면 cancel 을 시켜버린다.
fun foo(): Flow<Int> = flow{ for (i in 1..3) { delay(100) emit(i) } } fun main() = runBlocking<Unit>{ val time = measureTimeMillis { foo() .collectLatest { value -> println("Collecting $value") delay(300) println("Done $value") } } println("Collected in $time ms") } // Collecting 1 // Collecting 2 // Collecting 3 // Done 3 // Collected in 741ms
Composing multiple flows
* Zip
-
Sequence.zip 과 같이 flows 도 zip 을 가지고 있다.
fun main() = runBlocking<Unit> { val nums = (1..3).asFlow() val strs = flowOf("one", "two", "three") nums.zip(strs) { a, b -> "$a -> $b" } .collect { println( it ) } } // 1 -> one // 2 -> two // 3 -> three
* Combine
-
flow 가 가장 최신 값을 표시하는 경우, 가장 최근 값을 바탕으로 계산이 이루어져야 할 수 있다.
이것을 위한 것이 combine 이다.
예를 들어 한 숫자가 매 300ms 마다 업데이트가 되고 string 이 400ms 마다 업데이트가 된다면,
이들을 묶는 것은 zip 은 여전히 동일한 결과를 낸다. 다만, 매 400ms 마다 값이 출력된다는 차이만 있다.
fun main() = runBlocking<Unit> { val nums = (1..3).asFlow().onEach { delay(300) } val strs = flowOf("one", "two", "three").onEach { delay(400) } val startTime = System.currentTimeMillis() nums.zip(strs) { a, b -> "$a -> $b" } .collect { value -> println( "$value at ${System.currentTimeInMillis() - startTime} ms from start" ) } } // 1 -> one at 410 ms from start // 2 -> two at 820 ms from start // 3 -> three at 1230 ms from start
하지만 여기서 zip 대신 combine 으로 바꾼다면 결과가 이렇게 달라진다.
// 1 -> one at 452 ms from start // 2 -> one at 651 ms from start // 2 -> two at 854 ms from start // 3 -> two at 952 ms from start // 3 -> three at 1256 ms from start
Flattening flows
-
flow 는 sequence of value 를 async 로 받을 수 있는데, 이는 각각의 값이 다른 sequence of value 를 return 하는 경우를 아주 쉬운 예로 들 수 있다. ( 돼왕 : flow 가 flow 를 value 로 emit 할 수 있다는 의미이다. )
fun requestFlow(i:Int) : Flow<String> = flow { emit("$i: First") delay(500) emit("$i: Second") } (1..3).asFlow().map{ requestFlow(it) }
이렇게 하면 Flow<Flow<String>> 이 반환되는데, single flow 로 flatten (평평하게 하다.) 할 필요가 있을 수 있다.
Collection 과 sequence 는 flatten 과 flatMap operator 가 있다.
하지만 flow 의 async 의 특성상 flatten 은 추가적인 모드를 지원한다.
* flatMapConcat
-
flatMapConcat, flattenConcat operator 를 통해서 concat 을 할 수 있다.
sequence 에 대한 operator 와 아주 비슷하다.
이들은 collect 를 하기 전에 inner flow 의 결과를 기다린다.
fun requestFlow(i:Int) : Flow<String> = flow { emit("$i: First") delay(500) emit("$i: Second") } fun main() = runBlocking<Unit> { val startTime = System.currentTimeMillis() (1..3).asFlow().onEach { delay(100) } .flatMapConcat { requestFlow(it) } .collect { value -> println("$value at ${System.currentTimeMillis() - startTime} ms from start") } } // 1: First at 121 ms from start // 1: Second at 622 ms from start // 2: First at 727 ms from start // 2: Second at 1227 ms from start // 3: First at 1328 ms from start // 3: Second at 1829 ms from start
* flatMapMerge
-
flatten 의 다른 모드는 incoming flow 를 병렬적으로 모은 후에 single flow 로 머지하는 것이다.
그래서 value 가 바로바로 emit 될 수 있도록 한다.
이는 flatMapMerge 와 flattenMerge opreator 를 통해 수행된다.
이들은 optional 한 concurrency param 을 받는데, 이것이 동시에 돌 수 있는 concurrent flow 갯수를 limit 을 결정한다.
DEFAULT_CONCURRENCY 가 기본값이다.
fun requestFlow(i:Int) : Flow<String> = flow { emit("$i: First") delay(500) emit("$i: Second") } fun main() = runBlocking<Unit> { val startTime = System.currentTimeMillis() (1..3).asFlow().onEach { delay(100) } .flatMapMerge { requestFlow(it) } .collect { value -> println("$value at ${System.currentTimeMillis() - startTime} ms from start") } } // 1: First at 136 ms from start // 2: First at 231 ms from start // 3: First at 333 ms from start // 1: Second at 639 ms from start // 2: Second at 732 ms from start // 3: Second at 833 ms from start
flatMapMerge 는 { requestFlow(it) } 코드를 순차적으로 불렀지만, 결과가 되는 flow 는 병렬적으로 값을 내뱉었다.
이는 map 을 먼저 부르고 flattenMerge 를 그 다음에 부르는 것과 동일하다.
* flatMapLatest
-
collectLatest operator 와 비슷하게 Latest 로 flattening 하는 mode 가 있다.
이 녀석은 collector 가 처리가 느릴 경우 cancel 시키고 새 값을 emit 시킨다.
fun requestFlow(i:Int) : Flow<String> = flow { emit("$i: First") delay(500) emit("$i: Second") } fun main() = runBlocking<Unit> { val startTime = System.currentTimeMillis() (1..3).asFlow().onEach { delay(100) } .flatMapLatest { requestFlow(it) } .collect { value -> println("$value at ${System.currentTimeMillis() - startTime} ms from start") } } // 1: First at 139 ms from start // 2: First at 247 ms from start // 3: First at 350 ms from start // 3: Second at 850 ms from start
flatMapLatest 는 새로운 value 가 발행되었을 때 { requestFlow(it) } 블락의 코드를 취소시킨다.
requestFlow 자체가 빠르고 suspending 이 아니라 취소될 수 없었다면 flatMapLatest 호출이 아무런 차이를 못 만들어냈을 것이다.
Flow exceptions
-
flow collection 이 emitter 나 operator 안의 코드로 인해 exception 으로 끝날 수 있다.
* Collector try and catch
fun foo(): Flow<Int> = flow { for ( i in 1..3 ){ println("Emitting $i") emit(i) } } fun main() = runBlocking<Unit> { try{ foo().collect { value -> println(value) check(value <= 1) { "Collected $value" } // check 는 조건이 맞지 않으면 IllegalStateException 을 내는 kotlin top level function } } catch (e:Throwable){ println("Caught $e") } } // Emitting 1 // 1 // Emitting 2 // 2 // Caught java.lang.IllegalStateException: Collected 2
성공적으로 caught 되었고, 더 이상의 값이 emit 되지 않았다.
* Everything is caught
-
앞의 예제는 emitter, intermediate, terminal operator 에서 발생한 모든 exception 을 catch 한다.
다음과 같이 코드를 짜면 약간 다른 코드를 볼 수 있다.
fun foo(): Flow<String> = flow{ for (i in 1..3) { println("Emitting $i") emit(i) } }.map { value -> check(value <= 1) { "Crashed on $value" } "string $value" } fun main() = runBlocking<Unit> { try{ foo().collect { value -> println(value) } } catch (e:Throwable) { println("Caught $e") } } // Emitting 1 // string 1 // Emitting 2 // Caught java.lang.IllegalStateException: Crashed on 2
역시나 발행을 멈췄다.
Exception transparency
-
emitter 의 코드가 어떻게 예외 처리 로직을 encapsulate 할 수 있을까?
Flows 는 반드시 exception 에 대해 투명해야 한다.
그리고 flow { ... } builder code 안의 try/catch block 에서 value 를 emit 하는 것은 exception transparency 에 위배된다.
투명한 예외는 collector 가 항상 catch 로 exception 을 잡을 수 있음을 보장한다.
emitter 가 exception transparency 를 위해 catch operator 를 사용할 수 있으며, 이를 통해 예외 처리를 은닉할 수 있다.
catch operator 코드에는 exception 을 분석한 후 다른 방식으로 반응하는 로직이 들어가 있다.
Exception 이 throw 를 통해 다시 던져질 수 있다.
Exception 이 emit 을 이용해서 값의 발행으로 이어질 수 있다.
Exception 이 무시되거나, 로그를 찍거나, 다른 코드에 의해 처리될 수 있다.
fun foo(): Flow<String> = flow{ for(i in 1..3){ println("Emitting $i") emit(i) } } .map { value -> check(value <= 1) { "Crashed on $value" } "string $value" } fun main() = runBlocking<Unit>{ foo() .catch { e -> emit("Caught $e") } .collect { value -> println(value) } } // Emitting 1 // string 1 // Emitting 2 // Caught java.lang.IllegalStateException: Crashed on 2
* Transparent catch
-
"catch" operator 는 예외 투명성을 존중하며, upstream 의 exception 을 catch 한다
만약 catch 보다 아래 호출된 collect { ... } 에서 exception 이 나면 catch 는 이를 잡지 못한다.
* Catching declaratively
-
"catch" operator 의 선언적 특성을 모든 exception 처리에 대한 욕구과 조합시킬 수 있다.
이는 collect operator 의 body 를 onEach 에 위치시키고, onEach 를 catch 보다 먼저 호출하는 것이다.
fun foo(): Flow<String> = flow{ for(i in 1..3){ println("Emitting $i") emit(i) } } fun main() = runBlocking<Unit> { foo() .onEach { value -> check(value <= 1) { "Collected $value" } println(value) } .catch { e -> println("Caught $e") } .collect() }
Flow completion
-
flow collection 이 끝났을 때 (정상적으로 또는 예외적으로) 특정 action 을 수행하고 싶을 수 있다.
이는 명령적 방법과 선언적 방법 두가지 방법으로 수행할 수 있다.
* Imperative finally block
-
try/catch 를 추가하는 대신 collector 는 finally block 을 이용해서 collect 가 끝난 후 액션을 취할 수 있다.
fun foo(): Flow<Int> = (1..3).asFlow() fun main() = runBlocking<Unit> { try { foo().collect { value -> println(value) } } finally { println("Done") } } // 1 // 2 // 3 // Done
* Declarative handling
-
"onCompletion" intermediate operator 를 사용하여 collect 가 끝났을 때 action 을 취할 수 있다.
fun foo(): Flow<Int> = (1..3).asFlow() fun main() = runBlocking<Unit> { foo() .onCompletion { println("Done") } .collect { value -> println(value) } } // 1 // 2 // 3 // Done
onCompletion 의 강정은 lambda 의 Throwable parameter 가 nullable 이라는 것이다.
그래서 flow collection 이 정상적으로 끝났는지, 예외적으로 끝났는지를 알 수 있다.
fun foo(): Flow<Int> = flow { emit(1) throw RuntimeException() } fun main() = runBlocking<Unit> { foo() .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") } .catch { cause -> println("Caught exception") } .collect { value -> println(value) } } // 1 // Flow completed exceptionally // Caught exception
onComplete operator 는 catch 와 달리 exception 을 다루지 않는다.
발생한 exception 은 그대로 downstream 으로 전달된다.
* Upstream exceptions only
-
catch operator 와 같이 onCompletion 은 exception 이 upstream 으로부터 오는 것을 볼 수 있고, downstream 에 대해서는 볼 수 없다.
fun foo(): Flow<Int> = (1..3).asFlow() fun main() = runBlocking<Unit> { foo() .onCompletion { cause -> println("Flow completed with $cause") } .collect { value -> check(value <= 1) { "Collected $value" } println(value) } } // 1 // Flow completed with null // Exception in thread "main" java.lang.IllegalStateException: Collected 2
* Imperative versus declarative
-
imperative 와 declarative 중 어떤 접근법이 좋은가? 그리고 그 이유는 무엇인가?
library 로서 어떤 접근법이 우세하다고 지지하기 어렵다. 그저 본인의 스타일에 맞게 잘 선택해 쓰시길...
Launching flow
-
flow 로 특정 source 로부터 오는 async event 를 표현하는 것은 쉽다.
이는 addEventListener 와 비슷하게 code 를 등록하고, event 가 왔을 때 작업을 수행하면 된다.
onEach operator 가 이 역할을 한다.
하지만 onEach 는 intermediate operator 이다.
우리는 flow 를 collect 하기 위해 terminal operator 도 필요하다.
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) } fun main() = runBlocking<Unit> { events() .onEach { event -> println("Event: $event") } .collect() println("Done") } // Event: 1 // Event: 2 // Event: 3 // Done
"launchIn" terminal operator 는 훨씬 간편하게 이 작업을 할 수 있다.
collect 대신 launchIn 을 사용하면, 각각의 flow collection 을 다른 coroutine 에서 처리할 수 있다.
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) } //sampleStart fun main() = runBlocking<Unit> { events() .onEach { event -> println("Event: $event") } .launchIn(this) println("Done") } // Done // Event: 1 // Event: 2 // Event: 3
launchIn 에 요구되는 param 은 반드시 flow 를 collect 할 coroutine 이 launch 될 CoroutineScope 를 명시해야 한다.
위의 예제에서는 scope 는 runBlocking 에서부터 왔다. 그래서 flow 가 도는동안 runBlocking scope 은 child coroutine 이 종료될 때까지 기다린 후 main function 이 종료된다.
실제 앱에서 scope 은 보통 lifetime 이 한정적인 녀석으로 전달된다.
lifetime 이 종료될 때 해당 scope 도 취소되고, 관련된 flow의 collection 도 취소된다.
onEach{ ... }.launchIn(scope) 조합이 addEventListener 처럼 작동한다.
하지만 removeEventListener 가 필요가 없게 된다. 왜냐하면 structured concurrency 로 인해 자동으로 취소되기 때문이다.
-
launchIn 은 Job 을 return 하는 것을 눈여겨보자.
이 job 은 전체 scope 을 취소하는 대신 단일 job 을 취소되는 데 사용될 수 있고, join 하는 데도 사용될 수 있다.
Flow and Reactive Streams
-
RxJava 와 Project Reactor 등의 Reactive Streams 이나 framework 에 친숙하다면 Flow 도 이와 비슷하다는 것을 느꼈을 것이다.
사실 flow 의 design 은 Reactive Stream 과 그 구현에서 영감을 받았다.
그러나 Flow 의 주 목적은 되도록 간단하게 만드는 것이다.
Kotlin 으로 되어야 하고, suspension 과 잘 조화가 되어야 하며, structured concurrency 도 적용되도록 하는 것이었다.
이는 reactive 선구자들과 그들의 업적 없이는 불가능했을 것이다.
-
다르긴 하지만 개념적으로 Flow 는 reactive stream 이고, reactive Publisher 로 상호 전환될 수 있다.
해당 converter 는 kotlinx.coroutines 바깥에서 제공되며, kotlinx-coroutines-reactive 는 Reactive stream 용, kotlinx-coroutines-reactor 는 Project Reactor 용, 그리고 kotlinx-coroutines-rx2 는 RxJava2 를 위해 제공된다.
참고자료
https://github.com/Kotlin/kotlinx.coroutines/blob/master/docs/flow.md#asynchronous-flow
'프로그래밍 놀이터 > Kotlin, Coroutine' 카테고리의 다른 글
[coroutine] Shared mutable state and concurrency (0) | 2020.03.13 |
---|---|
[coroutine] Channels (5) | 2020.03.12 |
[coroutine] Composing Suspending Functions ( suspending 함수 만들기 ) (0) | 2020.03.10 |
[coroutine] Cancellation and Timeouts ( 취소와 타임아웃 ) (0) | 2020.03.09 |
[coroutine] coroutineScope 의 동작 특성을 알아보자. (0) | 2020.03.08 |
댓글