[coroutine] Channels
https://github.com/Kotlin/kotlinx.coroutines/blob/master/docs/channels.md#channels
Channel basics
-
Channel 은 BlockingQueue 와 매우 비슷한 개념이다.
한가지 큰 차이는 blocking 하는 put 대신 suspending function 인 send 가,
blocking 하는 take 대신 suspending function 인 receive 가 있다는 것이다.
fun main() = runBlocking{ val channel = Channel<Int>() launch{ for( x in 1..5) channel.send( x * x ) } repeat(5) { println( channel.receive() ) } println("Done!") } // 1 // 4 // 9 // 16 // 25 // Done!
Closing and iteration over channels
-
queue 와 달리 channel 은 더 이상의 element 가 없음을 close 로 표현할 수 있다.
이렇게 하면 receiver side 에서는 channel 을 통해 element 를 받기 위해 for loop 을 쉽게 적용할 수 있다.
개념상 close 는 특별한 close token 을 channel 에 보내는 것과 같다.
반복(iteration)은 close token 이 전달되는 순간 끝나고, close 가 받아지기 전 모든 보내진 element 를 받는 것은 보장된다.
fun main() = runBlocking{ val channel = Channel<Int>() launch{ for( x in 1..5) channel.send( x * x ) channel.close() } for( y in channel ) println(y) println("Done!") } // 1 // 4 // 9 // 16 // 25 // Done!
Building channel producers
-
coroutine 이 element sequence 를 만들어 내는 패턴은 아주 일반적이다.
producer-consumer pattern 은 동시성 코드에서 자주 찾아볼 수 있다.
이 producer 를 function 으로 만들 수 있고, channel 을 param 으로 받게 할 수 있다.
반면에 결과는 반드시 function 으로부터 return 되어야 한다.
produce 라는 coroutine builder 를 사용하면 쉽게 producer side 를 구현할 수 있다.
그리고 extension function 인 "consumeEach" 를 통해 consume 의 for loop 를 대체할 수 있다.
// produce block 의 return 이 channel 에 대한 close 를 불러준다고 이해하면 되겠다. fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce{ for (x in 1..5) send(x * x) } fun main() = runBlocking{ val squares = produceSquares() squares.consumeEach { println( it ) } println("Done!") } // 1 // 4 // 9 // 16 // 25 // Done!
Pipelines
-
pipeline 은 하나의 coroutine 이 produce 하는 경우에 사용하는 pattern 이다. (영구적으로 produce 할수도)
그리고 다른 coroutine(s) 가 해당 stream 을 consume 하면서 process 해서 또 다른 producer를 만들어내는 경우에 사용한다.
fun main() = runBlocking{ val numbers = produceNumbers() val squares = square(numbers) for (i in 1..5) println(squares.receive()) println("Done!") coroutineContext.cancelChildren() } fun CoroutineScope.produceNumbers() = produce<Int> { var x = 1 while (true) send(x++) } fun CoroutineScope.square(numbers: ReceiveChannel<Int>) : ReceiveChannel<Int> = produce{ for ( x in numbers ) send( x * x ) } // 1 // 4 // 9 // 16 // 25 // Done!
coroutine 을 만드는 모든 함수들은 CoroutineScope 의 extension 으로 정의되어 있다.
그래서 우리는 structured concurrency 를 사용하기 위해서 오래 지속되는 global coroutine 을 도입하지 않았다.
Prime numbers with pipeline
-
예제를 통해 pipeline 을 극단으로 써보자.
예제는 coroutine pipeline 을 이용해서 소수를 생성한다.
fun main() = runBlocking{ var cur = numbersFrom(2) for ( i in 1..10 ){ val prime = cur.receive() println(prime) cur = filter(cur, prime) } coroutineContext.cancelChildren() } fun CoroutineScope.numbersFrom(start:Int) = produce<Int>{ var x = start while(true) send(x++) } fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime:Int) = produce<Int>{ for(x in numbers) if (x % prime != 0) send(x) } // 2 // 3 // 5 // 7 // 11 // 13 // 17 // 19 // 23 // 29
iterator coroutine builder 를 사용해서 같은 pipeline 을 만들 수 있다.
produce 대신 iterator 로 교체하고, send 대신 yield, receive 대신 next, ReceiveChannel 대신 Iterator 를 사용하고, coroutine scope 를 제거하면 된다.
runBlocking 도 필요없어진다.
하지만 channel 을 사용한 pipeline 의 장점은 Dispatchers.Default context 를 사용하면 multi core 를 활용할 수 있다는 점이다.
어쨌든 위의 방법은 소수를 찾는 현실적인 방법은 아니다.
현실에서는 pipeline 이 suspending function 을 호출하기도 하는데, suspending functino 을 사용하는 pipeline 은 sequence 나 iterator 로 대체할 수 없다.
왜냐하면 그것들은 async 할 수 있는 produce 와는 다르게 suspension 을 허락하지 않기 때문이다.
Fan-out
-
여러개의 coroutine 이 동일한 channel 에서 값을 받을 수 있다.
이를 통해 일을 분배할 수 있다.
fun main() = runBlocking<Unit>{ val producer = produceNumbers() repeat(5) { launchProcessor(it, producer) } delay(950) producer.cancel() } fun CoroutineScope.produceNumbers() = produce<Int>{ var x = 1 while(true){ send(x++) delay(100) } } fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch{ for (msg in channel){ println("Processor #$id received $msg") } } // Processor #0 received 1 // Processor #0 received 2 // Processor #1 received 3 // Processor #2 received 4 // Processor #3 received 5 // Processor #4 received 6 // Processor #0 received 7 // Processor #1 received 8 // Processor #2 received 9 // Processor #3 received 10
위의 결과에서 id 의 순서는 바뀔 수 있다.
producer coroutine 의 취소는 channel 의 close 와 연결된다.
따라서 결국 channel 을 처리하는 iteration 도 종료된다.
그리고 channel 에 대한 for loop 가 fan-out 을 어떻게 처리하는지 iterate 가 어떻게 도는지 보고파악해 보자.
consumeEach 와 달리 for loop pattern 은 multiple coroutine 에 대해서 완벽하게 안전하다.
만약에 한 processor 의 coroutine 이 실패해도 다른 녀석들은 여전히 channel 을 처리한다.
반면에 consumeEach 로 하는경우는 보통의 혹은 에러로 종료되었을 때 관련된 channel 을 취소시킨다.
Fan-in
-
여러개의 coroutine 이 하나의 channel 에 보낼 수도 있다.
fun main() = runBlocking{ val channel = Channel<String>() launch { sendString(channel, "foo", 200L) } launch { sendString(channel, "BAR!", 500L) } repeat(6) { println(channel.receive()) } coroutineContext.cancelChildren() } suspend fun sendString(channel: SendChannel<String>, s:String, time:Long){ while(true){ delay(time) channel.send(s) } } // foo // foo // BAR! // foo // foo // BAR!
Buffered channels
-
지금까지 본 channel 은 buffer 가 없었다.
buffer 가 없는 channel 은 sender 와 receiver 가 서로 맞아야만 가능하다. (약속의 광장?)
만약 send 가 먼저 불리면, receive 가 불릴 때까지 suspend 하고,
만약 receive 가 먼저 불리면, send 가 불릴때까지 suspend 한다.
-
Channel() factory function 과 produce builder 는 capacity 라는 parameter 를 받는데 이는 buffer size 이다.
Buffer 는 BlockingQueue 가 capacity 를 가진 것처럼, sender 가 suspending 하기 전에 보낼 수 있고, buffer 가 다 차면 suspending 된다.
fun main() = runBlocking{ val channel = Channel<Int>(4) // buffer size 4 channel val sender = launch { repeat(10){ println("Sending $it") channel.send(it) // buffer 가 full 되기 전까지는 suspend 하지 않는다. } } delay(1000) sender.cancel() } // Sending 0 // Sending 1 // Sending 2 // Sending 3 // Sending 4
첫 4개의 element 는 buffer 에 담겨지고, 다섯번째 녀석을 보내기 전에 suspend 된다.
Channels are fair
-
channel 에 대한 send 와 receive 는 여러 코루틴으로부터의 호출로부터 순서적 입장에서 fair 하다.
선착순이다.
data class Ball(var hits: Int) fun main() = runBlocking{ val table = Channel<Ball>() launch { player("ping", table) } launch { player("pong", table) } table.send(Ball(0)) delay(1000) coroutineContext.cancelChildren() } suspend fun player(name:String, table:Channel<Ball>){ for (ball in table){ ball.hits++ println("$name $ball") delay(300) table.send(ball) } } // ping Ball(hits=1) // pong Ball(hits=2) // ping Ball(hits=3) // pong Ball(hits=4)
"ping" coroutine 이 먼저 시작되어서 먼저 ball 을 받는다.
"ping" coroutine 이 send 를 호출한 후 바로 다시 ball 받을 수도 있지만, ball 은 "pong" coroutine 으로 간다.
왜냐하면 "pong" 이 먼저 기다리고 있었고, channel 은 fair 하기 때문이다.
가끔 channel 이 사용되는 executor 의 종류에 따라 unfair 한듯한 동작을 하는 것처럼 보일 수 있다.
Ticker channels
-
Ticker channel 은 특별한 만남의 광장(?) channel 로 매번 마지막 consume time 으로부터 특정 delay 후에 Unit 을 만들어 낸다.
( 돼왕 : 아래 실험 결과를 보면 마지막 consume time 으로부터 특정 delay 가 아니라, 그냥 특정 delay 로 Unit 을 만들어내며, last value 에 접근할 수 있다. 기본값인 FIXED_PERIOD 의 역할이며, 글에서 설명하는 마지막 consume 시점으로부터의 time interval 은 FIXED_DLEAY 라고 볼 수 있다.)
이것 자체만 보면 별로 쓸모없어 보이지만, 복잡한 시간 기반 produce pipeline 과 windowing 을 하거나, 시간 기반 operator 를 만들 때 유용하다.
Ticker channel 은 select 에서 on tick 동작을 위해 사용될 수 있다.
ticker factory method 를 사용해서 해당 channel 을 만들 수 있다.
더 이상 사용하지 않을때는 ReceiveChannel.cancel 을 호출하면 된다.
fun main() = runBlocking{ val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } println("Initial element is available immediately: $nextElement") nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } println("Next element is ready in 50 ms: $nextElement") nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } println("Next element is ready in 100 ms: $nextElement") println("Consumer pauses for 150ms") delay(150) nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } println("Next element is available immediately after large consumer delay: $nextElement") nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } println("Next element is ready in 50 ms after consumer pause in 150ms: $nextElement") tickerChannel.cancel() } // Initial element is available immediately: kotlin.Unit // Next element is not ready in 50 ms: null // Next element is ready in 100 ms: kotlin.Unit // Consumer pauses for 150ms // Next element is available immediately after large consumer delay: kotlin.Unit // Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
ticker 는 consumer 의 pause 를 기본적으로 인지하고, 생성된 element 간의 fixed rate 를 유지하기 위해 다음 produced element 를 delay 하여 전달해준다.
optional param 인 mode 에 TickerMode.FIXED_DELAY 값을 줄 수 있는데, 이는 element 간의 fixed delay 를 유지해준다.
'프로그래밍 놀이터 > Kotlin, Coroutine' 카테고리의 다른 글
[kotlin] crossinline 에 대해 알아보자. (0) | 2020.08.05 |
---|---|
[coroutine] Shared mutable state and concurrency (0) | 2020.03.13 |
[coroutine] Asyncronous Flow (0) | 2020.03.11 |
[coroutine] Composing Suspending Functions ( suspending 함수 만들기 ) (0) | 2020.03.10 |
[coroutine] Cancellation and Timeouts ( 취소와 타임아웃 ) (0) | 2020.03.09 |
댓글