본문 바로가기
프로그래밍 놀이터/Kotlin, Coroutine

[coroutine] Channels

by 돼지왕 왕돼지 2020. 3. 12.
반응형

[coroutine] Channels


https://github.com/Kotlin/kotlinx.coroutines/blob/master/docs/channels.md#channels


Channel basics


-

Channel 은 BlockingQueue 와 매우 비슷한 개념이다.

한가지 큰 차이는 blocking 하는 put 대신 suspending function 인 send 가, 

blocking 하는 take 대신 suspending function 인 receive 가 있다는 것이다.

fun main() = runBlocking{
    val channel = Channel<Int>()
    launch{
        for( x in 1..5) channel.send( x * x )
    }
    repeat(5) { println( channel.receive() ) }
    println("Done!")
}

// 1
// 4
// 9
// 16
// 25
// Done!




Closing and iteration over channels


-

queue 와 달리 channel 은 더 이상의 element 가 없음을 close 로 표현할 수 있다.

이렇게 하면 receiver side 에서는 channel 을 통해 element 를 받기 위해 for loop 을 쉽게 적용할 수 있다.


개념상 close 는 특별한 close token 을 channel 에 보내는 것과 같다.

반복(iteration)은 close token 이 전달되는 순간 끝나고, close 가 받아지기 전 모든 보내진 element 를 받는 것은 보장된다.

fun main() = runBlocking{
    val channel = Channel<Int>()
    launch{
        for( x in 1..5) channel.send( x * x )
        channel.close()
    }
    for( y in channel ) println(y)
    println("Done!")
}

// 1
// 4
// 9
// 16
// 25
// Done!




Building channel producers


-

coroutine 이 element sequence 를 만들어 내는 패턴은 아주 일반적이다.

producer-consumer pattern 은 동시성 코드에서 자주 찾아볼 수 있다.

이 producer 를 function 으로 만들 수 있고, channel 을 param 으로 받게 할 수 있다. 

반면에 결과는 반드시 function 으로부터 return 되어야 한다.


produce 라는 coroutine builder 를 사용하면 쉽게 producer side 를 구현할 수 있다.

그리고 extension function 인 "consumeEach" 를 통해 consume 의 for loop 를 대체할 수 있다.

// produce block 의 return 이 channel 에 대한 close 를 불러준다고 이해하면 되겠다.
fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce{
    for (x in 1..5) send(x * x)
}

fun main() = runBlocking{
    val squares = produceSquares()
    squares.consumeEach { println( it ) }
    println("Done!")
}

// 1
// 4
// 9
// 16
// 25
// Done!




Pipelines


-

pipeline 은 하나의 coroutine 이 produce 하는 경우에 사용하는 pattern 이다. (영구적으로 produce 할수도)

그리고 다른 coroutine(s) 가 해당 stream 을 consume 하면서 process 해서 또 다른 producer를 만들어내는 경우에 사용한다.

fun main() = runBlocking{
    val numbers = produceNumbers()
    val squares = square(numbers)
    for (i in 1..5) println(squares.receive())
    println("Done!")
    coroutineContext.cancelChildren()
}

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) send(x++)
}

fun CoroutineScope.square(numbers: ReceiveChannel<Int>) : ReceiveChannel<Int> = produce{
    for ( x in numbers ) send( x * x )
}

// 1
// 4
// 9
// 16
// 25
// Done!

coroutine 을 만드는 모든 함수들은 CoroutineScope 의 extension 으로 정의되어 있다.

그래서 우리는 structured concurrency 를 사용하기 위해서 오래 지속되는 global coroutine 을 도입하지 않았다.





Prime numbers with pipeline


-

예제를 통해 pipeline 을 극단으로 써보자.

예제는 coroutine pipeline 을 이용해서 소수를 생성한다.

fun main() = runBlocking{
    var cur = numbersFrom(2)
    for ( i in 1..10 ){
        val prime = cur.receive()
        println(prime)
        cur = filter(cur, prime)
    }
    coroutineContext.cancelChildren()
}

fun CoroutineScope.numbersFrom(start:Int) = produce<Int>{
    var x = start
    while(true) send(x++)
}

fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime:Int) = produce<Int>{
    for(x in numbers) if (x % prime != 0) send(x)
}

// 2
// 3
// 5
// 7
// 11
// 13
// 17
// 19
// 23
// 29

iterator coroutine builder 를 사용해서 같은 pipeline 을 만들 수 있다.

produce 대신 iterator 로 교체하고, send 대신 yield, receive 대신 next, ReceiveChannel 대신 Iterator 를 사용하고, coroutine scope 를 제거하면 된다.

runBlocking 도 필요없어진다.

하지만 channel 을 사용한 pipeline 의 장점은 Dispatchers.Default context 를 사용하면 multi core 를 활용할 수 있다는 점이다.


어쨌든 위의 방법은 소수를 찾는 현실적인 방법은 아니다.


현실에서는 pipeline 이 suspending function 을 호출하기도 하는데, suspending functino 을 사용하는 pipeline 은 sequence 나 iterator 로 대체할 수 없다.

왜냐하면 그것들은 async 할 수 있는 produce 와는 다르게 suspension 을 허락하지 않기 때문이다.





Fan-out


-

여러개의 coroutine 이 동일한 channel 에서 값을 받을 수 있다.

이를 통해 일을 분배할 수 있다.

fun main() = runBlocking<Unit>{
    val producer = produceNumbers()
    repeat(5) { launchProcessor(it, producer) }
    delay(950)
    producer.cancel()
}

fun CoroutineScope.produceNumbers() = produce<Int>{
    var x = 1
    while(true){
        send(x++)
        delay(100)
    }
}

fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch{
    for (msg in channel){
        println("Processor #$id received $msg")
    }
}

// Processor #0 received 1
// Processor #0 received 2
// Processor #1 received 3
// Processor #2 received 4
// Processor #3 received 5
// Processor #4 received 6
// Processor #0 received 7
// Processor #1 received 8
// Processor #2 received 9
// Processor #3 received 10

위의 결과에서 id 의 순서는 바뀔 수 있다.


producer coroutine 의 취소는 channel 의 close 와 연결된다.

따라서 결국 channel 을 처리하는 iteration 도 종료된다.


그리고 channel 에 대한 for loop 가 fan-out 을 어떻게 처리하는지 iterate 가 어떻게 도는지 보고파악해 보자.

consumeEach 와 달리 for loop pattern 은 multiple coroutine 에 대해서 완벽하게 안전하다.

만약에 한 processor 의 coroutine 이 실패해도 다른 녀석들은 여전히 channel 을 처리한다.

반면에 consumeEach 로 하는경우는 보통의 혹은 에러로 종료되었을 때 관련된 channel 을 취소시킨다.








Fan-in


-

여러개의 coroutine 이 하나의 channel 에 보낼 수도 있다.

fun main() = runBlocking{
    val channel = Channel<String>()
    launch { sendString(channel, "foo", 200L) }
    launch { sendString(channel, "BAR!", 500L) }
    repeat(6) {
        println(channel.receive())
    }
    coroutineContext.cancelChildren()
}

suspend fun sendString(channel: SendChannel<String>, s:String, time:Long){
    while(true){
        delay(time)
        channel.send(s)
    }
}

// foo
// foo
// BAR!
// foo
// foo
// BAR!




Buffered channels


-

지금까지 본 channel 은 buffer 가 없었다.

buffer 가 없는 channel 은 sender 와 receiver 가 서로 맞아야만 가능하다. (약속의 광장?)

만약 send 가 먼저 불리면, receive 가 불릴 때까지 suspend 하고,

만약 receive 가 먼저 불리면, send 가 불릴때까지 suspend 한다.



-

Channel() factory function 과 produce builder 는 capacity 라는 parameter 를 받는데 이는 buffer size 이다.

Buffer 는 BlockingQueue 가 capacity 를 가진 것처럼, sender 가 suspending 하기 전에 보낼 수 있고, buffer 가 다 차면 suspending 된다.

fun main() = runBlocking{
    val channel = Channel<Int>(4) // buffer size 4 channel
    val sender = launch {
        repeat(10){
            println("Sending $it")
            channel.send(it) // buffer 가 full 되기 전까지는 suspend 하지 않는다.
        }
    }
    delay(1000)
    sender.cancel()
}

// Sending 0
// Sending 1
// Sending 2
// Sending 3
// Sending 4

첫 4개의 element 는 buffer 에 담겨지고, 다섯번째 녀석을 보내기 전에 suspend 된다.





Channels are fair


-

channel  에 대한 send 와 receive 는 여러 코루틴으로부터의 호출로부터 순서적 입장에서 fair 하다.

선착순이다.

data class Ball(var hits: Int)

fun main() = runBlocking{
    val table = Channel<Ball>()
    launch { player("ping", table) }
    launch { player("pong", table) }
    table.send(Ball(0))
    delay(1000)
    coroutineContext.cancelChildren()
}

suspend fun player(name:String, table:Channel<Ball>){
    for (ball in table){
        ball.hits++
        println("$name $ball")
        delay(300)
        table.send(ball)
    }
}

// ping Ball(hits=1)
// pong Ball(hits=2)
// ping Ball(hits=3)
// pong Ball(hits=4)

"ping" coroutine 이 먼저 시작되어서 먼저 ball 을 받는다.

"ping" coroutine 이 send 를 호출한 후 바로 다시 ball 받을 수도 있지만, ball 은 "pong" coroutine 으로 간다.

왜냐하면 "pong" 이 먼저 기다리고 있었고, channel 은 fair 하기 때문이다.


가끔 channel 이 사용되는 executor 의 종류에 따라 unfair 한듯한 동작을 하는 것처럼 보일 수 있다.





Ticker channels


-

Ticker channel 은 특별한 만남의 광장(?) channel 로 매번 마지막 consume time 으로부터 특정 delay 후에 Unit 을 만들어 낸다.
( 돼왕 : 아래 실험 결과를 보면 마지막 consume time 으로부터 특정 delay 가 아니라, 그냥 특정 delay 로 Unit 을 만들어내며, last value 에 접근할 수 있다. 기본값인 FIXED_PERIOD 의 역할이며, 글에서 설명하는 마지막 consume 시점으로부터의 time interval 은 FIXED_DLEAY 라고 볼 수 있다.)

이것 자체만 보면 별로 쓸모없어 보이지만, 복잡한 시간 기반 produce pipeline 과 windowing 을 하거나, 시간 기반 operator 를 만들 때 유용하다.

Ticker channel 은 select 에서 on tick 동작을 위해 사용될 수 있다.


ticker factory method 를 사용해서 해당 channel 을 만들 수 있다.

더 이상 사용하지 않을때는 ReceiveChannel.cancel 을 호출하면 된다.

fun main() = runBlocking{
    val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0)
    var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Initial element is available immediately: $nextElement")

    nextElement = withTimeoutOrNull(50) { tickerChannel.receive() }
    println("Next element is ready in 50 ms: $nextElement")

    nextElement = withTimeoutOrNull(50) { tickerChannel.receive() }
    println("Next element is ready in 100 ms: $nextElement")

    println("Consumer pauses for 150ms")
    delay(150)

    nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Next element is available immediately after large consumer delay: $nextElement")

    nextElement = withTimeoutOrNull(50) { tickerChannel.receive() }
    println("Next element is ready in 50 ms after consumer pause in 150ms: $nextElement")

    tickerChannel.cancel()
}

// Initial element is available immediately: kotlin.Unit
// Next element is not ready in 50 ms: null
// Next element is ready in 100 ms: kotlin.Unit
// Consumer pauses for 150ms
// Next element is available immediately after large consumer delay: kotlin.Unit
// Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit

ticker 는 consumer 의 pause 를 기본적으로 인지하고, 생성된 element 간의 fixed rate 를 유지하기 위해 다음 produced element 를 delay 하여 전달해준다.


optional param 인 mode 에 TickerMode.FIXED_DELAY 값을 줄 수 있는데, 이는 element 간의 fixed delay 를 유지해준다.




반응형

댓글