[coroutine] Channels
https://github.com/Kotlin/kotlinx.coroutines/blob/master/docs/channels.md#channels
Channel basics
-
Channel 은 BlockingQueue 와 매우 비슷한 개념이다.
한가지 큰 차이는 blocking 하는 put 대신 suspending function 인 send 가,
blocking 하는 take 대신 suspending function 인 receive 가 있다는 것이다.
fun main() = runBlocking{
val channel = Channel<Int>()
launch{
for( x in 1..5) channel.send( x * x )
}
repeat(5) { println( channel.receive() ) }
println("Done!")
}
// 1
// 4
// 9
// 16
// 25
// Done!
Closing and iteration over channels
-
queue 와 달리 channel 은 더 이상의 element 가 없음을 close 로 표현할 수 있다.
이렇게 하면 receiver side 에서는 channel 을 통해 element 를 받기 위해 for loop 을 쉽게 적용할 수 있다.
개념상 close 는 특별한 close token 을 channel 에 보내는 것과 같다.
반복(iteration)은 close token 이 전달되는 순간 끝나고, close 가 받아지기 전 모든 보내진 element 를 받는 것은 보장된다.
fun main() = runBlocking{
val channel = Channel<Int>()
launch{
for( x in 1..5) channel.send( x * x )
channel.close()
}
for( y in channel ) println(y)
println("Done!")
}
// 1
// 4
// 9
// 16
// 25
// Done!
Building channel producers
-
coroutine 이 element sequence 를 만들어 내는 패턴은 아주 일반적이다.
producer-consumer pattern 은 동시성 코드에서 자주 찾아볼 수 있다.
이 producer 를 function 으로 만들 수 있고, channel 을 param 으로 받게 할 수 있다.
반면에 결과는 반드시 function 으로부터 return 되어야 한다.
produce 라는 coroutine builder 를 사용하면 쉽게 producer side 를 구현할 수 있다.
그리고 extension function 인 "consumeEach" 를 통해 consume 의 for loop 를 대체할 수 있다.
// produce block 의 return 이 channel 에 대한 close 를 불러준다고 이해하면 되겠다.
fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce{
for (x in 1..5) send(x * x)
}
fun main() = runBlocking{
val squares = produceSquares()
squares.consumeEach { println( it ) }
println("Done!")
}
// 1
// 4
// 9
// 16
// 25
// Done!
Pipelines
-
pipeline 은 하나의 coroutine 이 produce 하는 경우에 사용하는 pattern 이다. (영구적으로 produce 할수도)
그리고 다른 coroutine(s) 가 해당 stream 을 consume 하면서 process 해서 또 다른 producer를 만들어내는 경우에 사용한다.
fun main() = runBlocking{
val numbers = produceNumbers()
val squares = square(numbers)
for (i in 1..5) println(squares.receive())
println("Done!")
coroutineContext.cancelChildren()
}
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++)
}
fun CoroutineScope.square(numbers: ReceiveChannel<Int>) : ReceiveChannel<Int> = produce{
for ( x in numbers ) send( x * x )
}
// 1
// 4
// 9
// 16
// 25
// Done!
coroutine 을 만드는 모든 함수들은 CoroutineScope 의 extension 으로 정의되어 있다.
그래서 우리는 structured concurrency 를 사용하기 위해서 오래 지속되는 global coroutine 을 도입하지 않았다.
Prime numbers with pipeline
-
예제를 통해 pipeline 을 극단으로 써보자.
예제는 coroutine pipeline 을 이용해서 소수를 생성한다.
fun main() = runBlocking{
var cur = numbersFrom(2)
for ( i in 1..10 ){
val prime = cur.receive()
println(prime)
cur = filter(cur, prime)
}
coroutineContext.cancelChildren()
}
fun CoroutineScope.numbersFrom(start:Int) = produce<Int>{
var x = start
while(true) send(x++)
}
fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime:Int) = produce<Int>{
for(x in numbers) if (x % prime != 0) send(x)
}
// 2
// 3
// 5
// 7
// 11
// 13
// 17
// 19
// 23
// 29
iterator coroutine builder 를 사용해서 같은 pipeline 을 만들 수 있다.
produce 대신 iterator 로 교체하고, send 대신 yield, receive 대신 next, ReceiveChannel 대신 Iterator 를 사용하고, coroutine scope 를 제거하면 된다.
runBlocking 도 필요없어진다.
하지만 channel 을 사용한 pipeline 의 장점은 Dispatchers.Default context 를 사용하면 multi core 를 활용할 수 있다는 점이다.
어쨌든 위의 방법은 소수를 찾는 현실적인 방법은 아니다.
현실에서는 pipeline 이 suspending function 을 호출하기도 하는데, suspending functino 을 사용하는 pipeline 은 sequence 나 iterator 로 대체할 수 없다.
왜냐하면 그것들은 async 할 수 있는 produce 와는 다르게 suspension 을 허락하지 않기 때문이다.
Fan-out
-
여러개의 coroutine 이 동일한 channel 에서 값을 받을 수 있다.
이를 통해 일을 분배할 수 있다.
fun main() = runBlocking<Unit>{
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel()
}
fun CoroutineScope.produceNumbers() = produce<Int>{
var x = 1
while(true){
send(x++)
delay(100)
}
}
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch{
for (msg in channel){
println("Processor #$id received $msg")
}
}
// Processor #0 received 1
// Processor #0 received 2
// Processor #1 received 3
// Processor #2 received 4
// Processor #3 received 5
// Processor #4 received 6
// Processor #0 received 7
// Processor #1 received 8
// Processor #2 received 9
// Processor #3 received 10
위의 결과에서 id 의 순서는 바뀔 수 있다.
producer coroutine 의 취소는 channel 의 close 와 연결된다.
따라서 결국 channel 을 처리하는 iteration 도 종료된다.
그리고 channel 에 대한 for loop 가 fan-out 을 어떻게 처리하는지 iterate 가 어떻게 도는지 보고파악해 보자.
consumeEach 와 달리 for loop pattern 은 multiple coroutine 에 대해서 완벽하게 안전하다.
만약에 한 processor 의 coroutine 이 실패해도 다른 녀석들은 여전히 channel 을 처리한다.
반면에 consumeEach 로 하는경우는 보통의 혹은 에러로 종료되었을 때 관련된 channel 을 취소시킨다.
Fan-in
-
여러개의 coroutine 이 하나의 channel 에 보낼 수도 있다.
fun main() = runBlocking{
val channel = Channel<String>()
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(6) {
println(channel.receive())
}
coroutineContext.cancelChildren()
}
suspend fun sendString(channel: SendChannel<String>, s:String, time:Long){
while(true){
delay(time)
channel.send(s)
}
}
// foo
// foo
// BAR!
// foo
// foo
// BAR!
Buffered channels
-
지금까지 본 channel 은 buffer 가 없었다.
buffer 가 없는 channel 은 sender 와 receiver 가 서로 맞아야만 가능하다. (약속의 광장?)
만약 send 가 먼저 불리면, receive 가 불릴 때까지 suspend 하고,
만약 receive 가 먼저 불리면, send 가 불릴때까지 suspend 한다.
-
Channel() factory function 과 produce builder 는 capacity 라는 parameter 를 받는데 이는 buffer size 이다.
Buffer 는 BlockingQueue 가 capacity 를 가진 것처럼, sender 가 suspending 하기 전에 보낼 수 있고, buffer 가 다 차면 suspending 된다.
fun main() = runBlocking{
val channel = Channel<Int>(4) // buffer size 4 channel
val sender = launch {
repeat(10){
println("Sending $it")
channel.send(it) // buffer 가 full 되기 전까지는 suspend 하지 않는다.
}
}
delay(1000)
sender.cancel()
}
// Sending 0
// Sending 1
// Sending 2
// Sending 3
// Sending 4
첫 4개의 element 는 buffer 에 담겨지고, 다섯번째 녀석을 보내기 전에 suspend 된다.
Channels are fair
-
channel 에 대한 send 와 receive 는 여러 코루틴으로부터의 호출로부터 순서적 입장에서 fair 하다.
선착순이다.
data class Ball(var hits: Int)
fun main() = runBlocking{
val table = Channel<Ball>()
launch { player("ping", table) }
launch { player("pong", table) }
table.send(Ball(0))
delay(1000)
coroutineContext.cancelChildren()
}
suspend fun player(name:String, table:Channel<Ball>){
for (ball in table){
ball.hits++
println("$name $ball")
delay(300)
table.send(ball)
}
}
// ping Ball(hits=1)
// pong Ball(hits=2)
// ping Ball(hits=3)
// pong Ball(hits=4)
"ping" coroutine 이 먼저 시작되어서 먼저 ball 을 받는다.
"ping" coroutine 이 send 를 호출한 후 바로 다시 ball 받을 수도 있지만, ball 은 "pong" coroutine 으로 간다.
왜냐하면 "pong" 이 먼저 기다리고 있었고, channel 은 fair 하기 때문이다.
가끔 channel 이 사용되는 executor 의 종류에 따라 unfair 한듯한 동작을 하는 것처럼 보일 수 있다.
Ticker channels
-
Ticker channel 은 특별한 만남의 광장(?) channel 로 매번 마지막 consume time 으로부터 특정 delay 후에 Unit 을 만들어 낸다.
( 돼왕 : 아래 실험 결과를 보면 마지막 consume time 으로부터 특정 delay 가 아니라, 그냥 특정 delay 로 Unit 을 만들어내며, last value 에 접근할 수 있다. 기본값인 FIXED_PERIOD 의 역할이며, 글에서 설명하는 마지막 consume 시점으로부터의 time interval 은 FIXED_DLEAY 라고 볼 수 있다.)
이것 자체만 보면 별로 쓸모없어 보이지만, 복잡한 시간 기반 produce pipeline 과 windowing 을 하거나, 시간 기반 operator 를 만들 때 유용하다.
Ticker channel 은 select 에서 on tick 동작을 위해 사용될 수 있다.
ticker factory method 를 사용해서 해당 channel 을 만들 수 있다.
더 이상 사용하지 않을때는 ReceiveChannel.cancel 을 호출하면 된다.
fun main() = runBlocking{
val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0)
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Initial element is available immediately: $nextElement")
nextElement = withTimeoutOrNull(50) { tickerChannel.receive() }
println("Next element is ready in 50 ms: $nextElement")
nextElement = withTimeoutOrNull(50) { tickerChannel.receive() }
println("Next element is ready in 100 ms: $nextElement")
println("Consumer pauses for 150ms")
delay(150)
nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Next element is available immediately after large consumer delay: $nextElement")
nextElement = withTimeoutOrNull(50) { tickerChannel.receive() }
println("Next element is ready in 50 ms after consumer pause in 150ms: $nextElement")
tickerChannel.cancel()
}
// Initial element is available immediately: kotlin.Unit
// Next element is not ready in 50 ms: null
// Next element is ready in 100 ms: kotlin.Unit
// Consumer pauses for 150ms
// Next element is available immediately after large consumer delay: kotlin.Unit
// Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
ticker 는 consumer 의 pause 를 기본적으로 인지하고, 생성된 element 간의 fixed rate 를 유지하기 위해 다음 produced element 를 delay 하여 전달해준다.
optional param 인 mode 에 TickerMode.FIXED_DELAY 값을 줄 수 있는데, 이는 element 간의 fixed delay 를 유지해준다.
'프로그래밍 놀이터 > Kotlin, Coroutine' 카테고리의 다른 글
| [kotlin] crossinline 에 대해 알아보자. (0) | 2020.08.05 |
|---|---|
| [coroutine] Shared mutable state and concurrency (0) | 2020.03.13 |
| [coroutine] Asyncronous Flow (0) | 2020.03.11 |
| [coroutine] Composing Suspending Functions ( suspending 함수 만들기 ) (0) | 2020.03.10 |
| [coroutine] Cancellation and Timeouts ( 취소와 타임아웃 ) (0) | 2020.03.09 |
댓글