본문 바로가기
프로그래밍 놀이터/Kotlin, Coroutine

[coroutine] Shared mutable state and concurrency

by 돼지왕 왕돼지 2020. 3. 13.
반응형

[coroutine] Shared mutable state and concurrency


https://github.com/Kotlin/kotlinx.coroutines/blob/master/docs/shared-mutable-state-and-concurrency.md#shared-mutable-state-and-concurrency


-

Dispatchers.Default 와 같은 dispatcher 를 사용하여 coroutine 은 multi thread 에서 병렬적으로 수행될 수 있다.

이는 일반적인 병렬 관련 문제를 보여준다.

주요 문제는 공유된 변경가능한 상태에 대한 동기화이다.





The problem


-

suspend fun massiveRun(action : suspend () -> Unit){
    val n = 100
    val k = 100
    val time = measureTimeMillis{
        coroutineScope{
            repeat(n){
                launch{
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")
}

var counter = 0

for main() = runBlocking{
    withContext(Dispatchers.Default){
        massiveRun{
            counter++
        }
    }
    println("Counter = $counter")
}

// Completed 10000 actions in 29 ms
// Counter = 9866

결과는 10000 이 아닐 확률이 아주 높다.

왜냐하면 100개의 coroutine 의 counter 에 대한 증가가 동기화 없이 진행되어 multi thread 이슈를 발생시켰기 떄문이다.





Volatiles are of no help


-

volatile 이 병렬화 문제를 해결해줄거라는 잘못된 상식이 있다.

suspend fun massiveRun(action : suspend () -> Unit){
    val n = 100
    val k = 100
    val time = measureTimeMillis{
        coroutineScope{
            repeat(n){
                launch{
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")
}

@Volatile
var counter = 0

for main() = runBlocking{
    withContext(Dispatchers.Default){
        massiveRun{
            counter++
        }
    }
    println("Counter = $counter")
}

// Completed 10000 actions in 29 ms
// Counter = 9519

우선 이 코드는 느리다. 그리고 Counter = 10000 을 얻지도 못할 것이다.

왜냐하면 volatile 은 순차적 읽기와 순차적 쓰기만을 지원할분, atomic 하게 read & write 를 동시에 하는 increment 를 처리하진 못한다.





Thread-safe data structures


-

coroutine 과 thread 모두에게 동작하는 thread-safe (synchronization, linearizable, atomic) data structure 들이 제공된다.

suspend fun massiveRun(action : suspend () -> Unit){
    val n = 100
    val k = 100
    val time = measureTimeMillis{
        coroutineScope{
            repeat(n){
                launch{
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")
}

var counter = AtomicInteger()

for main() = runBlocking{
    withContext(Dispatchers.Default){
        massiveRun{
            counter.incrementAndGet()
        }
    }
    println("Counter = $counter")
}

// Completed 10000 actions in 46 ms
// Counter = 10000

이것은 특정 문제에 대한 가장 빠른 해결책이다.

간단한 counter, collection, queue 와 다른 일반적인 data structure 와 기본적인 동작에 대해 잘 동작한다.

그러나 이미 준비되지 않은 thread-safe 관련 구현이 없거나, 어려운 상태의 문제가 있을 때 이를 확장하는 것은 쉽지 않다.





Thread confinement fine-grained


-

Thread confinement(스레드 한정) 은 shared mutable state 에 대한 접근을 single thread 로 한정시키는 것이다.

이것은 보통 UI app 에서 자주 쓰는 방법으로 UI state 는 single thread 에서만 접근 가능하다.

suspend fun massiveRun(action : suspend () -> Unit){
    val n = 100
    val k = 100
    val time = measureTimeMillis{
        coroutineScope{
            repeat(n){
                launch{
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")
}

var counterContext = newSingleThreadContext("CoroutineContext")
var counter = 0

for main() = runBlocking{
    withContext(Dispatchers.Default){
        massiveRun{
            withContext(counterContext){
                counter++
            }
        }
    }
    println("Counter = $counter")
}

// Completed 10000 actions in 1439 ms
// Counter = 10000

코드는 매우 느리게 작동한다. 그러나 thread 한정은 잘 되었다.

각각의 증가는 multi-multithread 의 Dispatchers.Default 에서 single-threaded context 로의 전환이 발생한다.








Thread confinement coarse-grained


-

실생활에서 thread 한정은 큰 코드 블락에 적용되곤 한다.

suspend fun massiveRun(action : suspend () -> Unit){
    val n = 100
    val k = 100
    val time = measureTimeMillis{
        coroutineScope{
            repeat(n){
                launch{
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")
}

var counterContext = newSingleThreadContext("CoroutineContext")
var counter = 0

for main() = runBlocking{
    withContext(coroutineContext{
        massiveRun{
            counter++
        }
    }
    println("Counter = $counter")
}

// Completed 10000 actions in 35 ms
// Counter = 10000

이렇게 하면 context switch 가 적어서 빨리 실행된다.





Mutual exclusion


-

상호 배제 방법은 shared state 에 대한 모든 수정의 critical section 이 절대 동시에 일어나지 않게 하여 보호하는 방법이다.

Blocking world 에서는 보통 synchronized 나 ReentrantLock 을 사용한다.

Coroutine 에서는 Mutex 라고 불리는 녀석을 사용한다.

이 녀석은 lock 과 unlock function 이 있어서 critical section 을 구분한다.

Mutex.lock() 의 큰 차이는 suspending function 이라 thread 를 block 하지 않는다는 것이다.


역시 withLock 이라는 extension function 도 있다. 

mutext.lock(); try { ... } finally { mutext.unlock() } pattern 을 간편하게 할 수 있다.

suspend fun massiveRun(action : suspend () -> Unit){
    val n = 100
    val k = 100
    val time = measureTimeMillis{
        coroutineScope{
            repeat(n){
                launch{
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")
}

var mutext = Mutex()
var counter = 0

for main() = runBlocking{
    withContext(Dispatchers.Default){
        massiveRun{
            mutext.withLock{
                counter++
            }
        }
    }
    println("Counter = $counter")
}

// Completed 10000 actions in 496 ms
// Counter = 10000

이 예제에서의 lock 은 잘 구분되었고, 그래서 비용지불이 필요하다.

하지만 특정 데이터를 주기적으로 업데이트해야 한다면 좋은 선택이 될 수 있다.

thread 한정 없이 값을 업데이트 할 수 있다.

( 돼왕: context switch 보다 훨씬 적은 비용이 들었다. )





Actors


-

actor 는 coroutine 과 coroutine 에 한정된 상태에 대한 조합으로 만들어진 녀석이다.

그리고 다른 coroutine 과 통신하기 위한 채널이 되기도 한다.

간단한 actor 는 함수로 쓰여질 수 있지만, 복잡한 상태를 가진 녀석은 class 가 적합하다.



-

actor coroutine builder 가 있어, actor 의  메시지를 우체통 채널과 결과를 보내는 channel 을 결합시킨다.

그래서 actor 에 대한 하나의 참조로 이 둘을 동시에 할 수 있다.



-

actor 를 사용하는 첫번째 방법은 actor 가 처리할 메세지 클래스들을 정의하는 것이다.

Kotlin 의 sealed class 가 이 목적에 적합하다.

CompletableDeferred 는 미래에 single value 가 return 될 것이라는 목적으로 사용된다.

sealed class CounterMsg
object IncCounter : CounterMsg()
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg()

fun CoroutineScope.counterActor() = actor<CounterMsg>{
    var counter = 0
    var (msg in channel){
        when(msg){
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}

suspend fun massiveRun(action : suspend () -> Unit){
    val n = 100
    val k = 100
    val time = measureTimeMillis{
        coroutineScope{
            repeat(n){
                launch{
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")
}

fun main() = runBlocking<Unit>{
    val counter = counterActor()
    withContext(Dispatcher.Default){
        massiveRun{
            counter.send(IncCounter)
        }
    }
    val response = CompletableDeferred<Int>()
    counter.send(GetCounter(response))
    println("Counter = ${response.await()}")
    counter.close()
}

actor 가 어떤 context 에서 수행되는지는 중요치 않다. (결과의 정확도에 영향을 미치지 않는다.)

actor 는 coroutine 이며 순차적으로 실행된다. 그래서 가변 공유 상태에 또 다른 한정 방법으로 해결책이 될 수 있다.

사실 actor 는 private 상태를 msg 를 통해서만 update 할 수 있어 lock 을 사용할 필요도 없다.


actor 는 locking 방법보다 더 효율적이다. 왜냐하면 context change 가 발생하지 않기 때문이다.




반응형

댓글