본문 바로가기
프로그래밍 놀이터/안드로이드, Java

[coroutine] Channels

by 돼지왕왕돼지 2020. 3. 12.

[coroutine] Channels


https://github.com/Kotlin/kotlinx.coroutines/blob/master/docs/channels.md#channels


Channel basics


-

Channel 은 BlockingQueue 와 매우 비슷한 개념이다.

한가지 큰 차이는 blocking 하는 put 대신 suspending function 인 send 가, 

blocking 하는 take 대신 suspending function 인 receive 가 있다는 것이다.

fun main() = runBlocking{
    val channel = Channel<Int>()
    launch{
        for( x in 1..5) channel.send( x * x )
    }
    repeat(5) { println( channel.receive() ) }
    println("Done!")
}

// 1
// 4
// 9
// 16
// 25
// Done!




Closing and iteration over channels


-

queue 와 달리 channel 은 더 이상의 element 가 없음을 close 로 표현할 수 있다.

이렇게 하면 receiver side 에서는 channel 을 통해 element 를 받기 위해 for loop 을 쉽게 적용할 수 있다.


개념상 close 는 특별한 close token 을 channel 에 보내는 것과 같다.

반복(iteration)은 close token 이 전달되는 순간 끝나고, close 가 받아지기 전 모든 보내진 element 를 받는 것은 보장된다.

fun main() = runBlocking{
    val channel = Channel<Int>()
    launch{
        for( x in 1..5) channel.send( x * x )
        channel.close()
    }
    for( y in channel ) println(y)
    println("Done!")
}

// 1
// 4
// 9
// 16
// 25
// Done!




Building channel producers


-

coroutine 이 element sequence 를 만들어 내는 패턴은 아주 일반적이다.

producer-consumer pattern 은 동시성 코드에서 자주 찾아볼 수 있다.

이 producer 를 function 으로 만들 수 있고, channel 을 param 으로 받게 할 수 있다. 

반면에 결과는 반드시 function 으로부터 return 되어야 한다.


produce 라는 coroutine builder 를 사용하면 쉽게 producer side 를 구현할 수 있다.

그리고 extension function 인 "consumeEach" 를 통해 consume 의 for loop 를 대체할 수 있다.

// produce block 의 return 이 channel 에 대한 close 를 불러준다고 이해하면 되겠다.
fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce{
    for (x in 1..5) send(x * x)
}

fun main() = runBlocking{
    val squares = produceSquares()
    squares.consumeEach { println( it ) }
    println("Done!")
}

// 1
// 4
// 9
// 16
// 25
// Done!




Pipelines


-

pipeline 은 하나의 coroutine 이 produce 하는 경우에 사용하는 pattern 이다. (영구적으로 produce 할수도)

그리고 다른 coroutine(s) 가 해당 stream 을 consume 하면서 process 해서 또 다른 producer를 만들어내는 경우에 사용한다.

fun main() = runBlocking{
    val numbers = produceNumbers()
    val squares = square(numbers)
    for (i in 1..5) println(squares.receive())
    println("Done!")
    coroutineContext.cancelChildren()
}

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) send(x++)
}

fun CoroutineScope.square(numbers: ReceiveChannel<Int>) : ReceiveChannel<Int> = produce{
    for ( x in numbers ) send( x * x )
}

// 1
// 4
// 9
// 16
// 25
// Done!

coroutine 을 만드는 모든 함수들은 CoroutineScope 의 extension 으로 정의되어 있다.

그래서 우리는 structured concurrency 를 사용하기 위해서 오래 지속되는 global coroutine 을 도입하지 않았다.





Prime numbers with pipeline


-

예제를 통해 pipeline 을 극단으로 써보자.

예제는 coroutine pipeline 을 이용해서 소수를 생성한다.

fun main() = runBlocking{
    var cur = numbersFrom(2)
    for ( i in 1..10 ){
        val prime = cur.receive()
        println(prime)
        cur = filter(cur, prime)
    }
    coroutineContext.cancelChildren()
}

fun CoroutineScope.numbersFrom(start:Int) = produce<Int>{
    var x = start
    while(true) send(x++)
}

fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime:Int) = produce<Int>{
    for(x in numbers) if (x % prime != 0) send(x)
}

// 2
// 3
// 5
// 7
// 11
// 13
// 17
// 19
// 23
// 29

iterator coroutine builder 를 사용해서 같은 pipeline 을 만들 수 있다.

produce 대신 iterator 로 교체하고, send 대신 yield, receive 대신 next, ReceiveChannel 대신 Iterator 를 사용하고, coroutine scope 를 제거하면 된다.

runBlocking 도 필요없어진다.

하지만 channel 을 사용한 pipeline 의 장점은 Dispatchers.Default context 를 사용하면 multi core 를 활용할 수 있다는 점이다.


어쨌든 위의 방법은 소수를 찾는 현실적인 방법은 아니다.


현실에서는 pipeline 이 suspending function 을 호출하기도 하는데, suspending functino 을 사용하는 pipeline 은 sequence 나 iterator 로 대체할 수 없다.

왜냐하면 그것들은 async 할 수 있는 produce 와는 다르게 suspension 을 허락하지 않기 때문이다.





Fan-out


-

여러개의 coroutine 이 동일한 channel 에서 값을 받을 수 있다.

이를 통해 일을 분배할 수 있다.

fun main() = runBlocking<Unit>{
    val producer = produceNumbers()
    repeat(5) { launchProcessor(it, producer) }
    delay(950)
    producer.cancel()
}

fun CoroutineScope.produceNumbers() = produce<Int>{
    var x = 1
    while(true){
        send(x++)
        delay(100)
    }
}

fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch{
    for (msg in channel){
        println("Processor #$id received $msg")
    }
}

// Processor #0 received 1
// Processor #0 received 2
// Processor #1 received 3
// Processor #2 received 4
// Processor #3 received 5
// Processor #4 received 6
// Processor #0 received 7
// Processor #1 received 8
// Processor #2 received 9
// Processor #3 received 10

위의 결과에서 id 의 순서는 바뀔 수 있다.


producer coroutine 의 취소는 channel 의 close 와 연결된다.

따라서 결국 channel 을 처리하는 iteration 도 종료된다.


그리고 channel 에 대한 for loop 가 fan-out 을 어떻게 처리하는지 iterate 가 어떻게 도는지 보고파악해 보자.

consumeEach 와 달리 for loop pattern 은 multiple coroutine 에 대해서 완벽하게 안전하다.

만약에 한 processor 의 coroutine 이 실패해도 다른 녀석들은 여전히 channel 을 처리한다.

반면에 consumeEach 로 하는경우는 보통의 혹은 에러로 종료되었을 때 관련된 channel 을 취소시킨다.








Fan-in


-

여러개의 coroutine 이 하나의 channel 에 보낼 수도 있다.

fun main() = runBlocking{
    val channel = Channel<String>()
    launch { sendString(channel, "foo", 200L) }
    launch { sendString(channel, "BAR!", 500L) }
    repeat(6) {
        println(channel.receive())
    }
    coroutineContext.cancelChildren()
}

suspend fun sendString(channel: SendChannel<String>, s:String, time:Long){
    while(true){
        delay(time)
        channel.send(s)
    }
}

// foo
// foo
// BAR!
// foo
// foo
// BAR!




Buffered channels


-

지금까지 본 channel 은 buffer 가 없었다.

buffer 가 없는 channel 은 sender 와 receiver 가 서로 맞아야만 가능하다. (약속의 광장?)

만약 send 가 먼저 불리면, receive 가 불릴 때까지 suspend 하고,

만약 receive 가 먼저 불리면, send 가 불릴때까지 suspend 한다.



-

Channel() factory function 과 produce builder 는 capacity 라는 parameter 를 받는데 이는 buffer size 이다.

Buffer 는 BlockingQueue 가 capacity 를 가진 것처럼, sender 가 suspending 하기 전에 보낼 수 있고, buffer 가 다 차면 suspending 된다.

fun main() = runBlocking{
    val channel = Channel<Int>(4) // buffer size 4 channel
    val sender = launch {
        repeat(10){
            println("Sending $it")
            channel.send(it) // buffer 가 full 되기 전까지는 suspend 하지 않는다.
        }
    }
    delay(1000)
    sender.cancel()
}

// Sending 0
// Sending 1
// Sending 2
// Sending 3
// Sending 4

첫 4개의 element 는 buffer 에 담겨지고, 다섯번째 녀석을 보내기 전에 suspend 된다.





Channels are fair


-

channel  에 대한 send 와 receive 는 여러 코루틴으로부터의 호출로부터 순서적 입장에서 fair 하다.

선착순이다.

data class Ball(var hits: Int)

fun main() = runBlocking{
    val table = Channel<Ball>()
    launch { player("ping", table) }
    launch { player("pong", table) }
    table.send(Ball(0))
    delay(1000)
    coroutineContext.cancelChildren()
}

suspend fun player(name:String, table:Channel<Ball>){
    for (ball in table){
        ball.hits++
        println("$name $ball")
        delay(300)
        table.send(ball)
    }
}

// ping Ball(hits=1)
// pong Ball(hits=2)
// ping Ball(hits=3)
// pong Ball(hits=4)

"ping" coroutine 이 먼저 시작되어서 먼저 ball 을 받는다.

"ping" coroutine 이 send 를 호출한 후 바로 다시 ball 받을 수도 있지만, ball 은 "pong" coroutine 으로 간다.

왜냐하면 "pong" 이 먼저 기다리고 있었고, channel 은 fair 하기 때문이다.


가끔 channel 이 사용되는 executor 의 종류에 따라 unfair 한듯한 동작을 하는 것처럼 보일 수 있다.





Ticker channels


-

Ticker channel 은 특별한 만남의 광장(?) channel 로 매번 마지막 consume time 으로부터 특정 delay 후에 Unit 을 만들어 낸다.
( 돼왕 : 아래 실험 결과를 보면 마지막 consume time 으로부터 특정 delay 가 아니라, 그냥 특정 delay 로 Unit 을 만들어내며, last value 에 접근할 수 있다. 기본값인 FIXED_PERIOD 의 역할이며, 글에서 설명하는 마지막 consume 시점으로부터의 time interval 은 FIXED_DLEAY 라고 볼 수 있다.)

이것 자체만 보면 별로 쓸모없어 보이지만, 복잡한 시간 기반 produce pipeline 과 windowing 을 하거나, 시간 기반 operator 를 만들 때 유용하다.

Ticker channel 은 select 에서 on tick 동작을 위해 사용될 수 있다.


ticker factory method 를 사용해서 해당 channel 을 만들 수 있다.

더 이상 사용하지 않을때는 ReceiveChannel.cancel 을 호출하면 된다.

fun main() = runBlocking{
    val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0)
    var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Initial element is available immediately: $nextElement")

    nextElement = withTimeoutOrNull(50) { tickerChannel.receive() }
    println("Next element is ready in 50 ms: $nextElement")

    nextElement = withTimeoutOrNull(50) { tickerChannel.receive() }
    println("Next element is ready in 100 ms: $nextElement")

    println("Consumer pauses for 150ms")
    delay(150)

    nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Next element is available immediately after large consumer delay: $nextElement")

    nextElement = withTimeoutOrNull(50) { tickerChannel.receive() }
    println("Next element is ready in 50 ms after consumer pause in 150ms: $nextElement")

    tickerChannel.cancel()
}

// Initial element is available immediately: kotlin.Unit
// Next element is not ready in 50 ms: null
// Next element is ready in 100 ms: kotlin.Unit
// Consumer pauses for 150ms
// Next element is available immediately after large consumer delay: kotlin.Unit
// Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit

ticker 는 consumer 의 pause 를 기본적으로 인지하고, 생성된 element 간의 fixed rate 를 유지하기 위해 다음 produced element 를 delay 하여 전달해준다.


optional param 인 mode 에 TickerMode.FIXED_DELAY 값을 줄 수 있는데, 이는 element 간의 fixed delay 를 유지해준다.




댓글5

  • 어이쿠 2020.07.15 15:15

    안녕하세요 돼왕님
    질문있습니다.

    아래와 같은 내용이 있는데
    // produce block 의 return 이 channel 에 대한 close 를 불러준다고 이해하면 되겠다.
    fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce{
    for (x in 1..5) send(x * x)
    }

    fun main() = runBlocking{
    val squares = produceSquares()
    squares.consumeEach { println( it ) }
    println("Done!")
    }


    로그를 찍어보니
    for 문안의 send가 2번 불리고(값 1과 2) consumeEach의 println이 두번불리고
    다시
    for 문안의 send가 2번 불리고(값 3과 4) consumeEach의 println이 두번불리고

    마지막에는 하나만(5) 남아서 한번만 불리고
    이러던데 무슨기준으로 2번씩 불리는건가요


    답글

    • val channel = Channel<Int>()
      launch {
      for (x in 1..5){
      println("before x = " + x)
      channel.send(x * x)
      println("after x = " + x)
      }
      channel.close()
      }

      for (y in channel) { println(y) }
      println("Done!")

      이렇게 코드를 돌려보면요,

      before x = 1
      after x = 1
      before x = 2
      1
      4
      after x = 2
      before x = 3
      after x = 3
      before x = 4
      9
      16
      after x = 4
      before x = 5
      after x = 5
      25
      Done!

      위와 같은 결과가 나옵니다.

      after 와 before 를 유의깊게 보면 suspending point 를 알 수 있는데요..
      send 코드를 마딱뜨렸을 때, receive 로 내용물이 소진 내용되지 않았다면 suspending 됩니다.

      안쪽 소스코드를 까보지 않았지만, 동작하는 것을 보면 send 를 호출했을 때 queue 에 내용을 먼저 넣어놓고 suspending 하는 것으로 보입니다.
      그러면 위의 output 과 정확히 맞아 떨어집니다.

    • https://aroundck.tistory.com/6165

      여기에 비슷한 실험을 하면서 고민한 흔적이 있는데요..
      그때도 이정도 수준에서 멈춘 것 같네요.. ㅎㅎ

      혹시 나중에라도 더 정확한 정답을 알게 되시면 공유 부탁드립니다.

  • 어이쿠 2020.07.16 13:50

    아! BlockingQueue개념이라 suspending 대기하는거군요
    이글 처음에 적어놓으셨는데 뒤로 가면서 까먹었나봅니다 ㅎㅎ
    알아보기 힘드네요 ㅎㅎ
    마지막에 저한테 숙제를 주셨네요 ㅠㅜ
    노력해보겠습니다
    좋은글, 빠른 답변, 큰 도움되었습니다
    감사합니다.

    답글

    • 숙제까지는 아니고 ㅎㅎ 서로 아는게 공유되면 서로의 시간을 세이브 하는 윈윈이니깐요!
      제가 블로그를 하는 이유도 서로의 시간이 세이브되고 도움이 되길 바라는 마음이거든요 ㅎㅎ