[Kotlin] Coroutines tutorial - async code 쉽게 짜기
Kotlin coroutines 는 async code 를 sync code 의 구조를 바꾸지 않고 짤 수 있게 도와준다.
Hello World Coroutine
// 결과는..
// Hello,
// World!
기본적으로 coroutine 은 light-weight thread 이다.
launch 라는 coroutine builder 를 통해 수행 된다.
launch{ … }, thread{ … } 과 비슷하고, delay(…) 은 Thread.sleep(…) 과 비슷하다.
여기서 비슷하다는 것은...
delay 함수는 non-blocking 으로 suspending function 이며, coroutine 에서만 사용될 수 있다.
Bridging blocking and non-blocking worlds
runBlocking { } 안에서 delay 를 수행하면 Thread.sleep 과 같은 효과를 얻을 수 있다.
fun main(args:Array<String>) = runBlocking<Unit>{
위와 같이 runBlocking{ } 을 사용할 수도 있다.
Waiting for a job
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch {
다른 coroutine 이 종료될 때까지 delay 를 하는 것은 좋은 구현이 아니다.
job 을 사용해서 join() 을 이용하는 것이 추천된다.
Extract function refactoring
fun main(args:Array<String>) = runBlocking<Unit>{
val job = launch { doWorld() }
suspend fun doWorld(){
suspending function 은 다른 일반 function 과 같이 쓰일 수 있다.
하지만 suspend function 은 delay 와 같은 suspending function 을 쓸 수 있다.
Coroutines ARE light-weight
val jobs = List(100_000){
jobs.forEach{ it.join() }
위 구문을 thread 로 처리했다면 OOM 이 났을 것이다.
Coroutines are like daemon threads
repeat(1000){ i ->
println(“I’m sleeping $i …”)
// output
// I’m sleeping 0 …
// I’m sleeping 1 …
// I’m sleeping 2 …
3개만 프린트하고 끝난다.
Active coroutine 은 process 를 alive 상태로 두지 않고 , daemon thread 같이 작동 한다.
cf) 데몬쓰레드는 일반 쓰레드(main 등)가 모두 종료되면 강제적으로 종료되는 특징을 가지고 있다.
Cancellation and timeouts - modified
val job = launch{
repeat(1000){ i ->
println(“I’m sleeping $i …”)
println(“main: I’m tired of waiting!”)
job.join() // job.cancelAndJoin 을 호출해도 된다.
println(“main: Now I can quit.”)
// output
// I'm sleeping 0 ...
// I'm sleeping 1 ...
// I'm sleeping 2 ...
// main: I'm tired of waiting!
// main: Now I can quit.
모든 suspending function 은 cancellable 하다.
suspending function 은 coroutine 의 cancel 을 확인하고 CancellationException 을 던진다.
그러나 다음과 같은 경우는 suspending function 이 아닌 단순 computation 이기 떄문에 cancel 이 안 된다.
val startTime = System.currentTimeMillis()
val job = launch{
var nextPrintTime = startTime
var i = 0
while( i < 5 ){
if(System.currentTimeMillis() >= nextPrintTime){
println(“I’m sleeping ${i++} …”)
nextPrintTime += 500L
println(“main: I’m tired of waiting!”)
println(“main: Now I can quit.”)
위와 같은 computation code 를 cancellable 로 만들어보자.
첫번째 방법은 suspending function 을 사용한다. 여기에 맞는 suspending function 은 “yield” 이다.
다른 방법은 매번 cancellation status 를 체크하는 것이다.
val job = launch{
var nextPrintTime = startTime
var i = 0
while( isActive ){
if(System.currentTimeMillis() >= nextPrintTime){
println(“I’m sleeping ${i++} …”)
nextPrintTime += 500L
isActive 는 coroutine 의 cancel 상태를 확인한다.
Cancaellable suspending function 은 CancellationException 을 던진다.
이때를 확인하기 위해 try, finally 구문을 사용할 수 있다.
위의 케이스(cancel 로 CancellatationException 이 발생하면)의 finally 블럭에서 다시 suspending function 을 사용하면 CancellationException 이 바로 또 발생한다.
보통은 문제가 되지 않지만, 몇몇 케이스에 suspending function 을 사용해야 한다면 withContext 를 사용해야 한다.
val job = launch{
repeat(1000){ i ->
println(“I’m sleeping $i …”)
println(“I’m running finally”)
println(“And I’ve just delayed for 1 sec because I’m non-cancellable”)
println("main: I'm tired of waiting!")
println("main: Now I can quit.")
job 을 두고 직접 cancel 하는 것이 아닌 Timeout 을 설정하는 방법도 있다.
repeat(1000){ i ->
println(“I’m sleeping $i …”)
TimeoutCancellationException 이 발생한다. (CancellationException 의 subclass)
coroutine 에서 CancellationException 은 일반적인 coroutine 의 종료로 보는데, main 에서 수행한 withTimeout 의 TimeoutCancellationException 은 단순 exception 으로 처리된다.
resource 를 정리할 것이 있다면 try, catch 를 사용해야 하고,
그렇지 않으면. withTimeoutOrNull 을 사용하면 된다. 이 녀석은 Exception 대신 결과로 null 을 return 한다.
Composing suspending functions - modified
suspend fun doSomethingUsefulOne():Int{
return 13
suspend fun doSomethingUsefulTwo():Int{
return 29
val time = measureTimeMillis{
val one = doSomethingUsefulOne()
val two = doSomethingUsefulTwo()
println(“The answer is ${one + two}”)
println(“Completed in $time ms”)
// tested completed time 은 2017ms
async 는 launch 와 비슷하다.
light-weight 한 thread 를 생성해서 다른 coroutine 을 돌린다.
launch 는 job 을 return 하며 resulting value 를 받지 못한다.
반면에 async 는 deferred return 을 받는다. ( future )
asnyc 에 대해 await 를 사용해서 결과를 받을 수 있다. Deffered 역시 Job 이며, 역시나 cancel 할 수 있다.
val time = measureTimeMillis{
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
println(“The answer is ${one.await() + two.await()}”)
// tested completed time 은 1017ms
async 의 lazy option 도 있다.
start param 에 CoroutineStart.LAZY 값을 주면 된다.
해당 coroutine 은 await 나 start 가 불렸을 때만 수행된다.
val time = measureTimeMillis{
val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
println(“The answer is ${one.await() + two.await()}”)
그러나 위의 예제는 one.await, two.await 가 불리면서 sequential 실행이 된다
fun somethingUsefulOneAsync() = async {
fun somethingUsefulTwoAsync() = async{
xxxAsync function 은 suspending function 이 아니다. (async style function)
val time = measureTimeMillis{
val one = somethingUsefulOneAsync()
val two = somethingUsefulTwoAsync()
println(“The answer is ${one.await() + two.await()}”)
Coroutine context and dispatchers - modified
Coroutine 은 항상 CoroutineContext type 으로 구분된 어떤 Context 에서 실행된다. 이는 Kotlin standard library 에 정의되어 있다.
Coroutine context 는 여러개의 element 의 set 이다.
main element 는 Job 과 그 dispatcher(이 녀석은 이제부터 볼것이다.) 이다.
Coroutine dispatcher 는 어떤 thread 를 사용해서 coroutine 을 수행시킬지를 결정한다.
특정 thread 에서 수행시키거나, thread pool 에 전달하거나, 지정하지 않거나 할 수 있다.
launch, async 같은 coroutine builder 들은 optional 하게 CoroutineContext param 을 받는다.
이 param 은 coroutine 에 대한 dispatcher 또는 다른 context element 를 지정한다.
fun main(args: Array<String>) = runBlocking<Unit> {
val jobs = arrayListOf<Job>()
jobs += launch(Unconfined){ // will work with main thread, unconfined = "국한되지 않은"
println(“‘Unconfied’: I’m working in thread ${Thread.currentThread().name}”)
jobs += launch(coroutineContext){ // parent context, runBlocking coroutine
println(“‘coroutineContext’: I’m working in thread ${Thread.currentThread().name}”)
jobs += launch(CommonPool){ // ForkJoinPool.commonPool 에 dispatch 된다.
println(“‘CommonPool’: I’m working in thread ${Thread.currentThread().name}”)
jobs += launch(newSingleThreadContext(“MyOwn”Thread"){ // new thread
println(“’newSTC’: I’m working in thread ${Thread.currentThread().name}”)
// 결과는.. order 는 다를 수 있다.
// ‘Unconfined’: I’m working in thread main
// ‘CommonPool’: I’m working in thread ForkJoinPool.commonPool-worker-1
// ’newSTC’: I’m working in thread MyOwnThread
// ‘coroutineContext’: I’m working in thread main
default dispatcher 는 DefaultDispatcher 이고 이는 CommonPool 이다.
launch{ } 는 launch(DefaultDispatcher) { } 와 같고 이는 또 다시 launch(CommonPool){ } 와 같다.
newSingleThreadContext 는 매우 비싼 res 를 사용한다.
더 이상 사용되지 않을 때는 “close” function 을 사용해서 닫아주거나, top-level var 로 존재하면서 재사용되어야 한다.
Unconfined coroutine dispatcher 는 caller thread 에서 작업을 시작한다. 그러나 첫번재 suspension point 까지만이다.
suspension 후에는 suspending function 에서 정하는 thread 에서 수행한다.
Unconfined dispatcher 는 coroutine 이 CPU 를 거의 사용하지 않을 때, UI 를 update 하지 않을 때 등에 적합하다.
coroutineContext 는 CoroutineScope 를 통해 접근할 수 있는 context 를 가져온다.
즉 parent context 를 가져온다.
FIFO scheduling order 로 작업을 수행한다고 보면 된다.
Coroutine 은 Unconfined 또는 default multithreaded dispatcher 를 통해 한 thread 에서 suspend 되고, 다른 thread 에서 resume 될 수 있다.
일반적인 debugging 방법은 thread name 의 log 를 찍는 것이다.
이 기능은 logging framework 에 의해 지원된다.
thread name만으로 정보가 부족할 수 있어서, 다른 정보들도 출력해준다.
-Dkotlinx.coroutines.debug JVM option 과 함께 아래코드를 돌리면..
fun log(msg:String) = println(“[${Thread.currentThtead().name}] $msg”)
fun main(args:Array<String>) = runBlocking<Unit>{
val a = async(coroutineContext){
log(“I’m computing a piece of the answer”)
val b = async(coroutineContext){
log(“I’m computing another piece of the answer”)
log(“The answer is ${a.await() * b.await()}”)
// 결과는.. 아래와 같이 나온다는데.. 첫 2개는 main 이 아닌데서(default 인 pool 에서) 돌았어야 하는거 아닌가?...
// [main @coroutine#2] I’m computing a piece of the answer.
// [main @coroutine#3] I’m computing another piece of the answer.
// [main @coroutine#1] The answer is 42.
@coroutine#1 과 같은 녀석이 debugging mode 일 때 찍힌다.
Jumping between threads
newSingleThreadContext(“Ctx1”).use { ctx1 ->
newSingleTheradContext(“Ctx2”).user{ ctx2 ->
log(“Started in ctx1”)
log(“Working in ctx2”)
log(“Back to ctx1”)
// 결과는..
// [Ctx1 @coroutine#1] Started in ctx1
// [Ctx2 @coroutine#1] Working in ctx2
// [Ctx1 @coroutine#1] Backin ctx1
coroutine 의 Job 은 context 의 일부이다.
coroutineContext[Job] 을 통해 context 를 가져올 수 있다.
println(“My job is ${coroutineContext[Job]}”)
// 결과는
// My job is “coroutine#1”:BlockingCoroutine{Active}@6d311334
coroutineContext 가 다른 coroutine 을 실행하기 위해 쓰이면, 새로운 Job 은 parent coroutine job 의 child 가 된다.
parent coroutine 이 cancel 되면 children 도 cancel 된다.
val request = launch{
val job1 = launch{
println(“Job1: I have my own context and execute independently!”)
println(“job1: I am not affected by cancellation of the request”)
val job2 = launch(coroutineContext){
println(“job2: I am a child of the request coroutine”)
println(“job2: I will not execute this line if my parent request is cancelled”)
println(“main: Who has survived request cancellation?”)
// 결과는..
// job1: I have my own context and execute independently!
// job2 : I am a child of the request coroutine
// job1 : I am not affected by cancellation of the request
// main : Who has survived request cancellation?
Coroutine context 는 + operator 와 combine 될 수 있다.
context 우측에 있는 녀석이 왼쪽의 상응하는 녀석을 교체한다. (????)
fun main(args: Array<String>) = runBlocking<Unit> {
val request = launch(coroutineContext){
// CommonPool 에서 수행, 그러나 coroutineContext 를 물고 들어가서 request 를 cancel 하면 함께 cancel 됨
val job = launch(coroutineContext + CommonPool){
println(“job: I am a child of the request coroutine, but with a different dispatcher”)
println(“job: I will not execute this line if my parent request is cancelled”)
println(“main: Who has survived request cancellation?”)
// 결과는,..
// job: I am a child of the request coroutine, but with a different dispatcher
// main: Who has survived request cancellation?
parent coroutine 은 children 의 실행 완료를 항상 기다린다.
따라서 children 에 대해 명시적으로 Job.join 같은 것을 안 해도 된다.
자동적으로 coroutine log 에 id 가 부여되는 것도 좋지만,
specific request 를 수행할 때는 명시적으로 이름을 주는 것이 debug 용도로 좋을 수 있다.
CoroutineName context element 가 이 역할을 한다.
여기에 지정한 이름이 debugging mode 에서 동작을 한다.
val v1 = async(CoroutineName(“v1coroutine”)){
log(“Computing v1”)
val v2 = async(CoroutineName(“v2coroutine”)){
log(“Computing v2”)
log(“The answer for v1/v2 = ${v1.await() / v2.await()}”)
// 결과는..
// [ForkJoinPool.commonPool-worker1 @v1coroutine#2] Computing v1
// [ForkJoinPool.commonPool-worker1 @v2coroutine#3] Computing v2
// [main @main#1] The answer for v1 / v2 = 42
lifecycle 관리를 Job 을 통해서 할 수 있다.
val job = Job()
val coroutines = List(10) { i ->
launch(coroutineContext, parent = job){
delay((i + 1) * 200L)
println(“Coroutine $i is done”)
println(“Launched ${coroutines.size} coroutines”)
println(“Cancelling the job!”)
// 결과는..
// Launched 10 coroutines
// Coroutine 0 is done
// Coroutine 1 is done
// Cancelling the job!
Android 기준 lifecycle 이 시작할 때(onCreate) Job 을 만들고, 끝날때(onDestory) 에서 Job 에 대해 cancel 을 날리는 방향 등으로 job 을 취소할 수 있다.
Asynchronous Flow - new
Channel 은 values 의 stream 을 전달하는 역할을 한다.
Channel 은 BlockingQueue 와 개념이 비슷하다.
큰 차이는 put operation 대신 suspending “send " 를 사용하고,
take operation 대신 suspending “receive ” 를 사용한다는 것이다.
val channel = Channel<Int>()
for(x in 1..5){
channel.send(x * x) // x * x 가 엄청 오래 걸리는 job 이라고 생각하자.
Queue 와 달리 Channel 은 더 이상의 element 추가를 못 하도록 close 할 수 있다.
개념적으로 “close ” 는 특별한 close token 을 channel 에 보내는 것 이다.
iteration 은 close token 을 받는 즉시 종료하게 된다.
val channel = Channel<Int>()
for(x in 1..5) channel.send(x * x)
for(y in channel){
producer-consumer pattern 을 만들 수 있다.
produce 를 위해서는 “produce ” 라는 coroutine builder 를 사용하고, consume 을 위해서는 “consumeEach ” 를 사용하면 된다.
fun produceSquares() = produce<Int>{
for(x in 1..5) send(x * x)
} // return 은 ReceiveChannel<Int> 이다.
val squares = produceSquares()
squares.consumeEach{ println(it) }
pipeline pattern 도 가능하다.
fun produceNumbers() = produce<Int>{
var x = 1
while(true) send(x++)
fun square(numbers: ReceiveChannel<Int>) = produce<Int>{
for(x in numbers) send(x * x)
val numbers = produceNumbers()
val squares = square(numbers)
for(i in 1..5) println(squares.receive())
위의 코드에서 사실상 coroutine 은 daemon thread 같은 거라 cancel 할 필요는 없다.
그러나 큰 앱에서는 resource 관리 차원에서 cancel 해주는 것이 좋다.
coroutine pipeline 을 이용해서 소수 생성을 해보자.
fun numbersFrom(context:CoroutineContext, start:Int) = produce<Int>(context){
var x = start
while(true) send(x++)
fun filter(context:CoroutineContext, numbers:ReceiveChannel<Int>, prime:Int) = produce<Int>(context){
for(x in numbers) if (x % prime != 0) send(x)
var cur = numbersFrom(coroutineContext, 2)
for(i in 1..10){
val prime = cur.receive()
cur = filter(coroutineContext, cur, prime)
위의 코드는 standard lib 에 있는 buildIterator 를 이용해서도 만들 수 있다.
produce -> buildIterator 로 구현
send -> yield 로 구현
receive -> next 로 구현
ReceiveChannel -> Iterator 로 구현
context 제거
pipeline 을 사용하면서 CommonPool 을 사용하면 multiple CPU 를 사용하도록 할 수 있다는 장점이 있다.
여러개의 coroutine 이 same channel 로부터 값을 받을 수 있다.
fun produceNumbers() = produce<Int>{
var x = 1
fun launchProcessor(id:Int, channel:ReceiveChannel<Int>) = launch{
println(“Processor #$id received $it”)
val producer = produceNumbers()
launchProcessor(it, producer)
// 결과는 아래와 같은 식 (processor 번호 순서는 변경이 있을 수 있음)
Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10
반대로 여러개의 coroutine 이 한 개의 channel 에 값을 보낼 수 있다.
suspend fun sendString(channel:SendChannel<String>, s:String, time:Long){
val channel = Channel<String>()
launch(coroutineContext){ sendString(channel, “foo”, 200L) }
launch(coroutineContext){ sendString(channel, “BAR!”, 500L) }
지금까지 본 channel 들에는 buffer 가 없었다.
Unbuffered channel 은 “send” 가 먼저 invoke 되면 “receive” 가 invoke 될 때까지 suspend 되는 방식이다.
그러나 Channel 을 만들 때 capacity 값을 주면서 buffer size 를 명시하면, 여러개의 element 들을 suspending 없이 보낼 수 있다.
val channel = Channel<Int>(4)
val sender = launch(coroutineContext){
println(“Sending $it”)
Sending 0
Sending 1
Sending 2
Sending 3
Sending 4
Channel 에 대한 send 와 receive operation 은 fair 하다.
이 말은 order 가 보장된다는 말이다. ( FIFO )
receive 를 먼저 부른 녀석이 먼저 들어온 element 를 가져간다.
Shared mutable state and concurrency - modified 따로 정리하였습니다. -> [coroutine] Shared mutable state and concurrency
Coroutine 은 CommonPool 과 같은 multi-thread dispatcher 를 이용해 동시성을 가질 수 있다.
이 경우 main problem 은 shared mutable state 에 대한 접근의 synchronization 문제다.
Thread confinement 는 shared mutable state 문제에 대한 접근 방법이다.
이 녀석은 해당 shared state 가 single thread 에서 접근됨을 보장한다.
보통 UI thread 등이 confine 되는 single thread 에 해당한다.
val counterContext = newSingleThreadContext(“CounterContext”)
var counter = 0
println(“Counter = $counter”)
thread-safe 는 보장되지만 느리다.
이 구현이 훨씬 빠르긴 하다.
critical section 을 보호하는 방법은 synchronized 나 ReentrantLock 등을 쓰는 것이다.
Coroutine 에서는 Mutex 가 대응하는 녀석이다.
lock 과 unlock function 이 있다.
차이는 Mutex.lock 은 suspending function 이다. thread 를 block 하지 않는다.
val mutext = Mutex()
var counter = 0
좋은 접근법이긴 하지만 역시나 cost 가 있다.
Actor 는 coroutine, coroutine 에 encapsulation 되고 한정된 상태(state), 그리고 다른 coroutine 과 communicate 하기 위한 channel 로 구성되어 있다. ( android 의 handler 와 비슷한 개념으로 보인다. )
간단한 actor 는 function 으로 쓰인다. 그러나 복잡한 state 를 다룰 때 actor 가 더 잘 어울린다.
“actor ” coroutine builder 는 actor 의 mailbox channel 과 combine 된다.
이 mailbox channel 을 통해 message 를 받는다.
그리고 result 를 보낼 send channel 도 갖는다.
CompletableDeferred 는 future 와 같은 녀석으로 single value 를 가져온다.
sealed class CounterMsg
object IncCounter : CounterMsg() // one way msg
class GetCounter(val response:CompletableDeferred<Int>) : CounterMsg() // return 있는 msg
fun counterActor() = actor<CounterMsg>{
var counter = 0
for (msg in channel){
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
val counter = counterActor()
val response = CompletableDeferred<Int>()
println(“Counter = ${response.await()}”)
Actor 의 방식은 locking 방식에 비해 더 효율적이다.
이 방식으로 하면 다른 context 로의 전환이 없기 때문이다.
actor 는 producer 이자 consumer 이다.
Select expression - experimental (core 1.0.1 기준)
Select expression 은 여러개의 suspending function 들을 await 하고 그 중 먼저 끝나는 녀석을 선택할 수 있는 기능을 제공 한다.
fun fizz(context:CoroutineContext) = produce<String>(context){
fun buzz(context:CoroutineContext) = produce<String>(context){
suspend fun selectFizzBuzz(fizz:ReceiveChannel<String>, buzz:ReceiveChannel<String>){
fizz.onReceive{ value ->
println(“fizz -> ‘$value’”)
buzz.onReceive{ value ->
println(“buzz -> ‘$value’”)
val fizz = fizz(coroutineContext)
val buzz = buzz(coroutineContext)
selectFizzBuzz(fizz, buzz)
/* 결과는..
fizz -> 'Fizz'
buzz -> 'Buzz!'
fizz -> 'Fizz'
fizz -> 'Fizz'
buzz -> 'Buzz!'
fizz -> 'Fizz'
buzz -> 'Buzz!’
onReceive 는 channel 이 close 되거나 안쪽로직에서 Exception 이 나거나 하면 fail 한다.
그래서 이 대신 onReceiveOrNull 을 사용할 수 있다.
이 때는 channel 이 close 되면 value 로 null 값이 들어온다.
Select expression 은 onSend clause 를 가지고 있다.
이 녀석은 selection 의 biased 특성과 잘 어울린다.
fun produceNumbers(context:CoroutineContext, side:SendChannel<Int>) = produce<Int>(context){
for(num in 1..10){
onSend(num){ } // send to primary channel
side.onSend(num){ } // send to side channel
val side = Channel<Int>()
side.consumeEach{ println(“Side channel has $it”) }
produceNumbers(coroutineContext, side).consumeEach{
println(“Consuming $it”)
println(“Done consuming”)
/* 결과는..
Consuming 1
Side channel has 2
Side channel has 3
Consuming 4
Side channel has 5
Side channel has 6
Consuming 7
Side channel has 8
Side channel has 9
Consuming 10
Done consuming
deferred value 는 onAwait clause 를 통해 선택할 수 있다.
fun asyncString(time:Int) = async{
“Waited for %time ms"
fun asyncStringsList():List<Deferred<String>>{
val random = Random(3)
return List(12){ asyncString(random.nextInt(1000)) }
val list = asyncStringList()
val result = select<String>{
list.withIndex().forEach{ (index, deferred) ->
deferred.onAwait { answer ->
“Deferred $index produced answer ‘$answer’”
val countActive = list.count { it.isActive }
println(“$countActive coroutines are still active”)
/* 결과는.. ( 4, 128, 11 값 모두 상황에 따라 다를 수 있을 것 같다. )
Deferred 4 produced answer 'Waited for 128 ms'
11 coroutines are still active
