[coroutine] Shared mutable state and concurrency |
-
Dispatchers.Default 와 같은 dispatcher 를 사용하여 coroutine 은 multi thread 에서 병렬적으로 수행될 수 있다.
이는 일반적인 병렬 관련 문제를 보여준다.
주요 문제는 공유된 변경가능한 상태에 대한 동기화이다.
The problem
-
suspend fun massiveRun(action : suspend () -> Unit){ val n = 100 val k = 100 val time = measureTimeMillis{ coroutineScope{ repeat(n){ launch{ repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms") } var counter = 0 for main() = runBlocking{ withContext(Dispatchers.Default){ massiveRun{ counter++ } } println("Counter = $counter") } // Completed 10000 actions in 29 ms // Counter = 9866
결과는 10000 이 아닐 확률이 아주 높다.
왜냐하면 100개의 coroutine 의 counter 에 대한 증가가 동기화 없이 진행되어 multi thread 이슈를 발생시켰기 떄문이다.
Volatiles are of no help
-
volatile 이 병렬화 문제를 해결해줄거라는 잘못된 상식이 있다.
suspend fun massiveRun(action : suspend () -> Unit){ val n = 100 val k = 100 val time = measureTimeMillis{ coroutineScope{ repeat(n){ launch{ repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms") } @Volatile var counter = 0 for main() = runBlocking{ withContext(Dispatchers.Default){ massiveRun{ counter++ } } println("Counter = $counter") } // Completed 10000 actions in 29 ms // Counter = 9519
우선 이 코드는 느리다. 그리고 Counter = 10000 을 얻지도 못할 것이다.
왜냐하면 volatile 은 순차적 읽기와 순차적 쓰기만을 지원할분, atomic 하게 read & write 를 동시에 하는 increment 를 처리하진 못한다.
Thread-safe data structures
-
coroutine 과 thread 모두에게 동작하는 thread-safe (synchronization, linearizable, atomic) data structure 들이 제공된다.
suspend fun massiveRun(action : suspend () -> Unit){ val n = 100 val k = 100 val time = measureTimeMillis{ coroutineScope{ repeat(n){ launch{ repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms") } var counter = AtomicInteger() for main() = runBlocking{ withContext(Dispatchers.Default){ massiveRun{ counter.incrementAndGet() } } println("Counter = $counter") } // Completed 10000 actions in 46 ms // Counter = 10000
이것은 특정 문제에 대한 가장 빠른 해결책이다.
간단한 counter, collection, queue 와 다른 일반적인 data structure 와 기본적인 동작에 대해 잘 동작한다.
그러나 이미 준비되지 않은 thread-safe 관련 구현이 없거나, 어려운 상태의 문제가 있을 때 이를 확장하는 것은 쉽지 않다.
Thread confinement fine-grained
-
Thread confinement(스레드 한정) 은 shared mutable state 에 대한 접근을 single thread 로 한정시키는 것이다.
이것은 보통 UI app 에서 자주 쓰는 방법으로 UI state 는 single thread 에서만 접근 가능하다.
suspend fun massiveRun(action : suspend () -> Unit){ val n = 100 val k = 100 val time = measureTimeMillis{ coroutineScope{ repeat(n){ launch{ repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms") } var counterContext = newSingleThreadContext("CoroutineContext") var counter = 0 for main() = runBlocking{ withContext(Dispatchers.Default){ massiveRun{ withContext(counterContext){ counter++ } } } println("Counter = $counter") } // Completed 10000 actions in 1439 ms // Counter = 10000
코드는 매우 느리게 작동한다. 그러나 thread 한정은 잘 되었다.
각각의 증가는 multi-multithread 의 Dispatchers.Default 에서 single-threaded context 로의 전환이 발생한다.
Thread confinement coarse-grained
-
실생활에서 thread 한정은 큰 코드 블락에 적용되곤 한다.
suspend fun massiveRun(action : suspend () -> Unit){ val n = 100 val k = 100 val time = measureTimeMillis{ coroutineScope{ repeat(n){ launch{ repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms") } var counterContext = newSingleThreadContext("CoroutineContext") var counter = 0 for main() = runBlocking{ withContext(coroutineContext{ massiveRun{ counter++ } } println("Counter = $counter") } // Completed 10000 actions in 35 ms // Counter = 10000
이렇게 하면 context switch 가 적어서 빨리 실행된다.
Mutual exclusion
-
상호 배제 방법은 shared state 에 대한 모든 수정의 critical section 이 절대 동시에 일어나지 않게 하여 보호하는 방법이다.
Blocking world 에서는 보통 synchronized 나 ReentrantLock 을 사용한다.
Coroutine 에서는 Mutex 라고 불리는 녀석을 사용한다.
이 녀석은 lock 과 unlock function 이 있어서 critical section 을 구분한다.
Mutex.lock() 의 큰 차이는 suspending function 이라 thread 를 block 하지 않는다는 것이다.
역시 withLock 이라는 extension function 도 있다.
mutext.lock(); try { ... } finally { mutext.unlock() } pattern 을 간편하게 할 수 있다.
suspend fun massiveRun(action : suspend () -> Unit){ val n = 100 val k = 100 val time = measureTimeMillis{ coroutineScope{ repeat(n){ launch{ repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms") } var mutext = Mutex() var counter = 0 for main() = runBlocking{ withContext(Dispatchers.Default){ massiveRun{ mutext.withLock{ counter++ } } } println("Counter = $counter") } // Completed 10000 actions in 496 ms // Counter = 10000
이 예제에서의 lock 은 잘 구분되었고, 그래서 비용지불이 필요하다.
하지만 특정 데이터를 주기적으로 업데이트해야 한다면 좋은 선택이 될 수 있다.
thread 한정 없이 값을 업데이트 할 수 있다.
( 돼왕: context switch 보다 훨씬 적은 비용이 들었다. )
Actors
-
actor 는 coroutine 과 coroutine 에 한정된 상태에 대한 조합으로 만들어진 녀석이다.
그리고 다른 coroutine 과 통신하기 위한 채널이 되기도 한다.
간단한 actor 는 함수로 쓰여질 수 있지만, 복잡한 상태를 가진 녀석은 class 가 적합하다.
-
actor coroutine builder 가 있어, actor 의 메시지를 우체통 채널과 결과를 보내는 channel 을 결합시킨다.
그래서 actor 에 대한 하나의 참조로 이 둘을 동시에 할 수 있다.
-
actor 를 사용하는 첫번째 방법은 actor 가 처리할 메세지 클래스들을 정의하는 것이다.
Kotlin 의 sealed class 가 이 목적에 적합하다.
CompletableDeferred 는 미래에 single value 가 return 될 것이라는 목적으로 사용된다.
sealed class CounterMsg object IncCounter : CounterMsg() class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() fun CoroutineScope.counterActor() = actor<CounterMsg>{ var counter = 0 var (msg in channel){ when(msg){ is IncCounter -> counter++ is GetCounter -> msg.response.complete(counter) } } } suspend fun massiveRun(action : suspend () -> Unit){ val n = 100 val k = 100 val time = measureTimeMillis{ coroutineScope{ repeat(n){ launch{ repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms") } fun main() = runBlocking<Unit>{ val counter = counterActor() withContext(Dispatcher.Default){ massiveRun{ counter.send(IncCounter) } } val response = CompletableDeferred<Int>() counter.send(GetCounter(response)) println("Counter = ${response.await()}") counter.close() }
actor 가 어떤 context 에서 수행되는지는 중요치 않다. (결과의 정확도에 영향을 미치지 않는다.)
actor 는 coroutine 이며 순차적으로 실행된다. 그래서 가변 공유 상태에 또 다른 한정 방법으로 해결책이 될 수 있다.
사실 actor 는 private 상태를 msg 를 통해서만 update 할 수 있어 lock 을 사용할 필요도 없다.
actor 는 locking 방법보다 더 효율적이다. 왜냐하면 context change 가 발생하지 않기 때문이다.
'프로그래밍 놀이터 > Kotlin, Coroutine' 카테고리의 다른 글
[Koin] Koin 에 대해 알아보자 (tutorial) (0) | 2020.08.06 |
---|---|
[kotlin] crossinline 에 대해 알아보자. (0) | 2020.08.05 |
[coroutine] Channels (5) | 2020.03.12 |
[coroutine] Asyncronous Flow (0) | 2020.03.11 |
[coroutine] Composing Suspending Functions ( suspending 함수 만들기 ) (0) | 2020.03.10 |
댓글