[Kotlin] Coroutines tutorial - async code 쉽게 짜기
https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md
-
Kotlin coroutines 는 async code 를 sync code 의 구조를 바꾸지 않고 짤 수 있게 도와준다.
Coroutine Basics - modified 내용이 바뀌어서 새로 정리하였습니다. -> Coroutine Basics ( 코루틴 기초 )
예전 기록을 보고 싶으시면 아래 "더보기" 버튼을 눌러주세요!
더보기 접기
Hello World Coroutine
-
launch{
delay(1000L)
println(“World!”)
}
println(“Hello,”)
Thread.sleep(2000L)
// 결과는..
// Hello,
// World!
-
기본적으로 coroutine 은 light-weight thread 이다.
launch 라는 coroutine builder 를 통해 수행 된다.
launch{ … }, thread{ … } 과 비슷하고, delay(…) 은 Thread.sleep(…) 과 비슷하다.
여기서 비슷하다는 것은...
delay 함수는 non-blocking 으로 suspending function 이며, coroutine 에서만 사용될 수 있다.
Bridging blocking and non-blocking worlds
-
runBlocking { } 안에서 delay 를 수행하면 Thread.sleep 과 같은 효과를 얻을 수 있다.
-
fun main(args:Array<String>) = runBlocking<Unit>{
launch{
delay(1000L)
println(“World!”)
}
println(“Hello,”)
delay(2000L)
}
위와 같이 runBlocking{ } 을 사용할 수도 있다.
Waiting for a job
-
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch {
delay(1000L)
println(“World!”)
}
println(“Hello,”)
job.join()
}
다른 coroutine 이 종료될 때까지 delay 를 하는 것은 좋은 구현이 아니다.
job 을 사용해서 join() 을 이용하는 것이 추천된다.
Extract function refactoring
-
fun main(args:Array<String>) = runBlocking<Unit>{
val job = launch { doWorld() }
println(“Hello,”)
job.join()
}
suspend fun doWorld(){
delay(1000L)
println(“World!”)
}
suspending function 은 다른 일반 function 과 같이 쓰일 수 있다.
하지만 suspend function 은 delay 와 같은 suspending function 을 쓸 수 있다.
Coroutines ARE light-weight
-
val jobs = List(100_000){
launch{
delay(1000L)
print(“.”)
}
}
jobs.forEach{ it.join() }
위 구문을 thread 로 처리했다면 OOM 이 났을 것이다.
Coroutines are like daemon threads
-
launch{
repeat(1000){ i ->
println(“I’m sleeping $i …”)
delay(500L)
}
}
delay(1300L)
// output
// I’m sleeping 0 …
// I’m sleeping 1 …
// I’m sleeping 2 …
3개만 프린트하고 끝난다.
Active coroutine 은 process 를 alive 상태로 두지 않고 , daemon thread 같이 작동 한다.
cf) 데몬쓰레드는 일반 쓰레드(main 등)가 모두 종료되면 강제적으로 종료되는 특징을 가지고 있다.
접기
Cancellation and timeouts - modified
예전 기록을 보고 싶으시면 아래 "더보기" 버튼을 눌러주세요!
더보기 접기
-
val job = launch{
repeat(1000){ i ->
println(“I’m sleeping $i …”)
delay(500L)
}
delay(1300L)
println(“main: I’m tired of waiting!”)
job.cancel()
job.join() // job.cancelAndJoin 을 호출해도 된다.
println(“main: Now I can quit.”)
// output
// I'm sleeping 0 ...
// I'm sleeping 1 ...
// I'm sleeping 2 ...
// main: I'm tired of waiting!
// main: Now I can quit.
-
모든 suspending function 은 cancellable 하다.
suspending function 은 coroutine 의 cancel 을 확인하고 CancellationException 을 던진다.
그러나 다음과 같은 경우는 suspending function 이 아닌 단순 computation 이기 떄문에 cancel 이 안 된다.
val startTime = System.currentTimeMillis()
val job = launch{
var nextPrintTime = startTime
var i = 0
while( i < 5 ){
if(System.currentTimeMillis() >= nextPrintTime){
println(“I’m sleeping ${i++} …”)
nextPrintTime += 500L
}
}
}
delay(1300L)
println(“main: I’m tired of waiting!”)
job.cancelAndJoin()
println(“main: Now I can quit.”)
-
위와 같은 computation code 를 cancellable 로 만들어보자.
첫번째 방법은 suspending function 을 사용한다. 여기에 맞는 suspending function 은 “yield” 이다.
다른 방법은 매번 cancellation status 를 체크하는 것이다.
val job = launch{
var nextPrintTime = startTime
var i = 0
while( isActive ){
if(System.currentTimeMillis() >= nextPrintTime){
println(“I’m sleeping ${i++} …”)
nextPrintTime += 500L
}
}
}
isActive 는 coroutine 의 cancel 상태를 확인한다.
-
Cancaellable suspending function 은 CancellationException 을 던진다.
이때를 확인하기 위해 try, finally 구문을 사용할 수 있다.
-
위의 케이스(cancel 로 CancellatationException 이 발생하면)의 finally 블럭에서 다시 suspending function 을 사용하면 CancellationException 이 바로 또 발생한다.
보통은 문제가 되지 않지만, 몇몇 케이스에 suspending function 을 사용해야 한다면 withContext 를 사용해야 한다.
val job = launch{
try{
repeat(1000){ i ->
println(“I’m sleeping $i …”)
delay(500L)
}
}finally{
withContext(NonCancellable){
println(“I’m running finally”)
delay(1000L)
println(“And I’ve just delayed for 1 sec because I’m non-cancellable”)
}
}
}
delay(1300L)
println("main: I'm tired of waiting!")
job.cancelAndJoin()
println("main: Now I can quit.")
-
job 을 두고 직접 cancel 하는 것이 아닌 Timeout 을 설정하는 방법도 있다.
withTimeout(1300L){
repeat(1000){ i ->
println(“I’m sleeping $i …”)
delay(500L)
}
}
TimeoutCancellationException 이 발생한다. (CancellationException 의 subclass)
coroutine 에서 CancellationException 은 일반적인 coroutine 의 종료로 보는데, main 에서 수행한 withTimeout 의 TimeoutCancellationException 은 단순 exception 으로 처리된다.
resource 를 정리할 것이 있다면 try, catch 를 사용해야 하고,
그렇지 않으면. withTimeoutOrNull 을 사용하면 된다. 이 녀석은 Exception 대신 결과로 null 을 return 한다.
접기
Composing suspending functions - modified
예전 기록을 보고 싶으시면 아래 "더보기" 버튼을 눌러주세요!
더보기 접기
-
suspend fun doSomethingUsefulOne():Int{
delay(1000L)
return 13
}
suspend fun doSomethingUsefulTwo():Int{
delay(1000L)
return 29
}
val time = measureTimeMillis{
val one = doSomethingUsefulOne()
val two = doSomethingUsefulTwo()
println(“The answer is ${one + two}”)
}
println(“Completed in $time ms”)
// tested completed time 은 2017ms
-
async 는 launch 와 비슷하다.
light-weight 한 thread 를 생성해서 다른 coroutine 을 돌린다.
launch 는 job 을 return 하며 resulting value 를 받지 못한다.
반면에 async 는 deferred return 을 받는다. ( future )
asnyc 에 대해 await 를 사용해서 결과를 받을 수 있다. Deffered 역시 Job 이며, 역시나 cancel 할 수 있다.
val time = measureTimeMillis{
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
println(“The answer is ${one.await() + two.await()}”)
}
// tested completed time 은 1017ms
-
async 의 lazy option 도 있다.
start param 에 CoroutineStart.LAZY 값을 주면 된다.
해당 coroutine 은 await 나 start 가 불렸을 때만 수행된다.
val time = measureTimeMillis{
val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
println(“The answer is ${one.await() + two.await()}”)
}
그러나 위의 예제는 one.await, two.await 가 불리면서 sequential 실행이 된다
-
fun somethingUsefulOneAsync() = async {
doSomethingUsefulOne()
}
fun somethingUsefulTwoAsync() = async{
doSomethingUsefulTwo()
}
xxxAsync function 은 suspending function 이 아니다. (async style function)
val time = measureTimeMillis{
val one = somethingUsefulOneAsync()
val two = somethingUsefulTwoAsync()
runBlocking{
println(“The answer is ${one.await() + two.await()}”)
}
}
접기
Coroutine context and dispatchers - modified
예전 기록을 보고 싶으시면 아래 "더보기" 버튼을 눌러주세요!
더보기 접기
-
Coroutine 은 항상 CoroutineContext type 으로 구분된 어떤 Context 에서 실행된다. 이는 Kotlin standard library 에 정의되어 있다.
Coroutine context 는 여러개의 element 의 set 이다.
main element 는 Job 과 그 dispatcher(이 녀석은 이제부터 볼것이다.) 이다.
-
Coroutine dispatcher 는 어떤 thread 를 사용해서 coroutine 을 수행시킬지를 결정한다.
특정 thread 에서 수행시키거나, thread pool 에 전달하거나, 지정하지 않거나 할 수 있다.
launch, async 같은 coroutine builder 들은 optional 하게 CoroutineContext param 을 받는다.
이 param 은 coroutine 에 대한 dispatcher 또는 다른 context element 를 지정한다.
fun main(args: Array<String>) = runBlocking<Unit> {
val jobs = arrayListOf<Job>()
jobs += launch(Unconfined){ // will work with main thread, unconfined = "국한되지 않은"
println(“‘Unconfied’: I’m working in thread ${Thread.currentThread().name}”)
}
jobs += launch(coroutineContext){ // parent context, runBlocking coroutine
println(“‘coroutineContext’: I’m working in thread ${Thread.currentThread().name}”)
}
jobs += launch(CommonPool){ // ForkJoinPool.commonPool 에 dispatch 된다.
println(“‘CommonPool’: I’m working in thread ${Thread.currentThread().name}”)
}
jobs += launch(newSingleThreadContext(“MyOwn”Thread"){ // new thread
println(“’newSTC’: I’m working in thread ${Thread.currentThread().name}”)
}
}
// 결과는.. order 는 다를 수 있다.
// ‘Unconfined’: I’m working in thread main
// ‘CommonPool’: I’m working in thread ForkJoinPool.commonPool-worker-1
// ’newSTC’: I’m working in thread MyOwnThread
// ‘coroutineContext’: I’m working in thread main
-
default dispatcher 는 DefaultDispatcher 이고 이는 CommonPool 이다.
launch{ } 는 launch(DefaultDispatcher) { } 와 같고 이는 또 다시 launch(CommonPool){ } 와 같다.
-
newSingleThreadContext 는 매우 비싼 res 를 사용한다.
더 이상 사용되지 않을 때는 “close” function 을 사용해서 닫아주거나, top-level var 로 존재하면서 재사용되어야 한다.
-
Unconfined coroutine dispatcher 는 caller thread 에서 작업을 시작한다. 그러나 첫번재 suspension point 까지만이다.
suspension 후에는 suspending function 에서 정하는 thread 에서 수행한다.
Unconfined dispatcher 는 coroutine 이 CPU 를 거의 사용하지 않을 때, UI 를 update 하지 않을 때 등에 적합하다.
coroutineContext 는 CoroutineScope 를 통해 접근할 수 있는 context 를 가져온다.
즉 parent context 를 가져온다.
FIFO scheduling order 로 작업을 수행한다고 보면 된다.
-
Coroutine 은 Unconfined 또는 default multithreaded dispatcher 를 통해 한 thread 에서 suspend 되고, 다른 thread 에서 resume 될 수 있다.
일반적인 debugging 방법은 thread name 의 log 를 찍는 것이다.
이 기능은 logging framework 에 의해 지원된다.
thread name만으로 정보가 부족할 수 있어서, 다른 정보들도 출력해준다.
-Dkotlinx.coroutines.debug JVM option 과 함께 아래코드를 돌리면..
fun log(msg:String) = println(“[${Thread.currentThtead().name}] $msg”)
fun main(args:Array<String>) = runBlocking<Unit>{
val a = async(coroutineContext){
log(“I’m computing a piece of the answer”)
6
}
val b = async(coroutineContext){
log(“I’m computing another piece of the answer”)
7
}
log(“The answer is ${a.await() * b.await()}”)
}
// 결과는.. 아래와 같이 나온다는데.. 첫 2개는 main 이 아닌데서(default 인 pool 에서) 돌았어야 하는거 아닌가?...
// [main @coroutine#2] I’m computing a piece of the answer.
// [main @coroutine#3] I’m computing another piece of the answer.
// [main @coroutine#1] The answer is 42.
@coroutine#1 과 같은 녀석이 debugging mode 일 때 찍힌다.
-
Jumping between threads
newSingleThreadContext(“Ctx1”).use { ctx1 ->
newSingleTheradContext(“Ctx2”).user{ ctx2 ->
runBlocking(ctx1){
log(“Started in ctx1”)
withContext(ctx2){
log(“Working in ctx2”)
}
log(“Back to ctx1”)
}
}
}
// 결과는..
// [Ctx1 @coroutine#1] Started in ctx1
// [Ctx2 @coroutine#1] Working in ctx2
// [Ctx1 @coroutine#1] Backin ctx1
-
coroutine 의 Job 은 context 의 일부이다.
coroutineContext[Job] 을 통해 context 를 가져올 수 있다.
println(“My job is ${coroutineContext[Job]}”)
// 결과는
// My job is “coroutine#1”:BlockingCoroutine{Active}@6d311334
-
coroutineContext 가 다른 coroutine 을 실행하기 위해 쓰이면, 새로운 Job 은 parent coroutine job 의 child 가 된다.
parent coroutine 이 cancel 되면 children 도 cancel 된다.
val request = launch{
val job1 = launch{
println(“Job1: I have my own context and execute independently!”)
delay(1000)
println(“job1: I am not affected by cancellation of the request”)
}
val job2 = launch(coroutineContext){
delay(100)
println(“job2: I am a child of the request coroutine”)
delay(1000)
println(“job2: I will not execute this line if my parent request is cancelled”)
}
job1.join()
job2.join()
}
delay(500)
request.cancel()
delay(1000)
println(“main: Who has survived request cancellation?”)
// 결과는..
// job1: I have my own context and execute independently!
// job2 : I am a child of the request coroutine
// job1 : I am not affected by cancellation of the request
// main : Who has survived request cancellation?
-
Coroutine context 는 + operator 와 combine 될 수 있다.
context 우측에 있는 녀석이 왼쪽의 상응하는 녀석을 교체한다. (????)
fun main(args: Array<String>) = runBlocking<Unit> {
val request = launch(coroutineContext){
// CommonPool 에서 수행, 그러나 coroutineContext 를 물고 들어가서 request 를 cancel 하면 함께 cancel 됨
val job = launch(coroutineContext + CommonPool){
println(“job: I am a child of the request coroutine, but with a different dispatcher”)
delay(1000)
println(“job: I will not execute this line if my parent request is cancelled”)
}
job.join()
}
delay(500)
request.cancel()
delay(1000)
println(“main: Who has survived request cancellation?”)
}
// 결과는,..
// job: I am a child of the request coroutine, but with a different dispatcher
// main: Who has survived request cancellation?
-
parent coroutine 은 children 의 실행 완료를 항상 기다린다.
따라서 children 에 대해 명시적으로 Job.join 같은 것을 안 해도 된다.
-
자동적으로 coroutine log 에 id 가 부여되는 것도 좋지만,
specific request 를 수행할 때는 명시적으로 이름을 주는 것이 debug 용도로 좋을 수 있다.
CoroutineName context element 가 이 역할을 한다.
여기에 지정한 이름이 debugging mode 에서 동작을 한다.
val v1 = async(CoroutineName(“v1coroutine”)){
delay(500)
log(“Computing v1”)
252
}
val v2 = async(CoroutineName(“v2coroutine”)){
delay(1000)
log(“Computing v2”)
6
}
log(“The answer for v1/v2 = ${v1.await() / v2.await()}”)
// 결과는..
// [ForkJoinPool.commonPool-worker1 @v1coroutine#2] Computing v1
// [ForkJoinPool.commonPool-worker1 @v2coroutine#3] Computing v2
// [main @main#1] The answer for v1 / v2 = 42
-
lifecycle 관리를 Job 을 통해서 할 수 있다.
val job = Job()
val coroutines = List(10) { i ->
launch(coroutineContext, parent = job){
delay((i + 1) * 200L)
println(“Coroutine $i is done”)
}
}
println(“Launched ${coroutines.size} coroutines”)
delay(500L)
println(“Cancelling the job!”)
job.cancelAndJoin()
// 결과는..
// Launched 10 coroutines
// Coroutine 0 is done
// Coroutine 1 is done
// Cancelling the job!
-
Android 기준 lifecycle 이 시작할 때(onCreate) Job 을 만들고, 끝날때(onDestory) 에서 Job 에 대해 cancel 을 날리는 방향 등으로 job 을 취소할 수 있다.
접기
Asynchronous Flow - new
Exception handling and supervision - new 따로 정리하였습니다. -> [coroutine] Exception handling and supervision
Channels - modified ( now not experimental! ) 따로 정리하였습니다. -> [coroutine] Channels
예전 기록을 보고 싶으시면 아래 "더보기" 버튼을 눌러주세요!
더보기 접기
-
Channel 은 values 의 stream 을 전달하는 역할을 한다.
Channel 은 BlockingQueue 와 개념이 비슷하다.
큰 차이는 put operation 대신 suspending “send " 를 사용하고,
take operation 대신 suspending “receive ” 를 사용한다는 것이다.
val channel = Channel<Int>()
launch{
for(x in 1..5){
channel.send(x * x) // x * x 가 엄청 오래 걸리는 job 이라고 생각하자.
}
}
repeat(5){
println(channel.receive()){
}
println(“Done!”)
-
Queue 와 달리 Channel 은 더 이상의 element 추가를 못 하도록 close 할 수 있다.
개념적으로 “close ” 는 특별한 close token 을 channel 에 보내는 것 이다.
iteration 은 close token 을 받는 즉시 종료하게 된다.
val channel = Channel<Int>()
launch{
for(x in 1..5) channel.send(x * x)
channel.close()
}
for(y in channel){
println(y)
}
println(“Done!”)
-
producer-consumer pattern 을 만들 수 있다.
produce 를 위해서는 “produce ” 라는 coroutine builder 를 사용하고, consume 을 위해서는 “consumeEach ” 를 사용하면 된다.
fun produceSquares() = produce<Int>{
for(x in 1..5) send(x * x)
} // return 은 ReceiveChannel<Int> 이다.
val squares = produceSquares()
squares.consumeEach{ println(it) }
println(“Done!”)
-
pipeline pattern 도 가능하다.
fun produceNumbers() = produce<Int>{
var x = 1
while(true) send(x++)
}
fun square(numbers: ReceiveChannel<Int>) = produce<Int>{
for(x in numbers) send(x * x)
}
val numbers = produceNumbers()
val squares = square(numbers)
for(i in 1..5) println(squares.receive())
println(“Done!”)
squares.cancel()
numbers.cancel()
위의 코드에서 사실상 coroutine 은 daemon thread 같은 거라 cancel 할 필요는 없다.
그러나 큰 앱에서는 resource 관리 차원에서 cancel 해주는 것이 좋다.
-
coroutine pipeline 을 이용해서 소수 생성을 해보자.
fun numbersFrom(context:CoroutineContext, start:Int) = produce<Int>(context){
var x = start
while(true) send(x++)
}
fun filter(context:CoroutineContext, numbers:ReceiveChannel<Int>, prime:Int) = produce<Int>(context){
for(x in numbers) if (x % prime != 0) send(x)
}
var cur = numbersFrom(coroutineContext, 2)
for(i in 1..10){
val prime = cur.receive()
println(prime)
cur = filter(coroutineContext, cur, prime)
}
coroutineContext.cancelChildren()
-
위의 코드는 standard lib 에 있는 buildIterator 를 이용해서도 만들 수 있다.
produce -> buildIterator 로 구현
send -> yield 로 구현
receive -> next 로 구현
ReceiveChannel -> Iterator 로 구현
context 제거
pipeline 을 사용하면서 CommonPool 을 사용하면 multiple CPU 를 사용하도록 할 수 있다는 장점이 있다.
-
여러개의 coroutine 이 same channel 로부터 값을 받을 수 있다.
fun produceNumbers() = produce<Int>{
var x = 1
while(true){
send(x++)
delay(100)
}
}
fun launchProcessor(id:Int, channel:ReceiveChannel<Int>) = launch{
channel.consumeEach{
println(“Processor #$id received $it”)
}
}
val producer = produceNumbers()
repeat(5){
launchProcessor(it, producer)
}
delay(950)
producer.cancel()
// 결과는 아래와 같은 식 (processor 번호 순서는 변경이 있을 수 있음)
Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10
-
반대로 여러개의 coroutine 이 한 개의 channel 에 값을 보낼 수 있다.
suspend fun sendString(channel:SendChannel<String>, s:String, time:Long){
while(true){
delay(time)
channel.send(s)
}
}
val channel = Channel<String>()
launch(coroutineContext){ sendString(channel, “foo”, 200L) }
launch(coroutineContext){ sendString(channel, “BAR!”, 500L) }
repeat(6){
println(channel.receive())
}
coroutineContext.cancelChildren()
/*
결과는..
foo
foo
BAR!
foo
foo
BAR!
*/
-
지금까지 본 channel 들에는 buffer 가 없었다.
Unbuffered channel 은 “send” 가 먼저 invoke 되면 “receive” 가 invoke 될 때까지 suspend 되는 방식이다.
그러나 Channel 을 만들 때 capacity 값을 주면서 buffer size 를 명시하면, 여러개의 element 들을 suspending 없이 보낼 수 있다.
val channel = Channel<Int>(4)
val sender = launch(coroutineContext){
repeat(10){
println(“Sending $it”)
channel.send(it)
}
}
delay(1000)
sender.cancel()
/*
결과는..
Sending 0
Sending 1
Sending 2
Sending 3
Sending 4
*/
-
Channel 에 대한 send 와 receive operation 은 fair 하다.
이 말은 order 가 보장된다는 말이다. ( FIFO )
receive 를 먼저 부른 녀석이 먼저 들어온 element 를 가져간다.
접기
Shared mutable state and concurrency - modified 따로 정리하였습니다. -> [coroutine] Shared mutable state and concurrency
예전 기록을 보고 싶으시면 아래 "더보기" 버튼을 눌러주세요!
더보기 접기
-
Coroutine 은 CommonPool 과 같은 multi-thread dispatcher 를 이용해 동시성을 가질 수 있다.
이 경우 main problem 은 shared mutable state 에 대한 접근의 synchronization 문제다.
-
Thread confinement 는 shared mutable state 문제에 대한 접근 방법이다.
이 녀석은 해당 shared state 가 single thread 에서 접근됨을 보장한다.
보통 UI thread 등이 confine 되는 single thread 에 해당한다.
val counterContext = newSingleThreadContext(“CounterContext”)
var counter = 0
massiveRun(CommonPool){
withContext(counterContext){
counter++
}
}
println(“Counter = $counter”)
thread-safe 는 보장되지만 느리다.
-
massiveRun(counterContext){
counter++
}
이 구현이 훨씬 빠르긴 하다.
-
critical section 을 보호하는 방법은 synchronized 나 ReentrantLock 등을 쓰는 것이다.
Coroutine 에서는 Mutex 가 대응하는 녀석이다.
lock 과 unlock function 이 있다.
차이는 Mutex.lock 은 suspending function 이다. thread 를 block 하지 않는다.
val mutext = Mutex()
var counter = 0
massiveRun(CommonPool){
mutex.withLock{
counter++
}
}
좋은 접근법이긴 하지만 역시나 cost 가 있다.
-
Actor 는 coroutine, coroutine 에 encapsulation 되고 한정된 상태(state), 그리고 다른 coroutine 과 communicate 하기 위한 channel 로 구성되어 있다. ( android 의 handler 와 비슷한 개념으로 보인다. )
간단한 actor 는 function 으로 쓰인다. 그러나 복잡한 state 를 다룰 때 actor 가 더 잘 어울린다.
“actor ” coroutine builder 는 actor 의 mailbox channel 과 combine 된다.
이 mailbox channel 을 통해 message 를 받는다.
그리고 result 를 보낼 send channel 도 갖는다.
CompletableDeferred 는 future 와 같은 녀석으로 single value 를 가져온다.
sealed class CounterMsg
object IncCounter : CounterMsg() // one way msg
class GetCounter(val response:CompletableDeferred<Int>) : CounterMsg() // return 있는 msg
fun counterActor() = actor<CounterMsg>{
var counter = 0
for (msg in channel){
when(msg){
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
val counter = counterActor()
massiveRun(CommonPool){
counter.send(IncCounter)
}
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println(“Counter = ${response.await()}”)
counter.close()
-
Actor 의 방식은 locking 방식에 비해 더 효율적이다.
이 방식으로 하면 다른 context 로의 전환이 없기 때문이다.
-
actor 는 producer 이자 consumer 이다.
접기
Select expression - experimental (core 1.0.1 기준)
-
Select expression 은 여러개의 suspending function 들을 await 하고 그 중 먼저 끝나는 녀석을 선택할 수 있는 기능을 제공 한다.
-
fun fizz(context:CoroutineContext) = produce<String>(context){
while(true){
delay(300)
send(“Fizz”)
}
}
fun buzz(context:CoroutineContext) = produce<String>(context){
while(true){
delay(500)
send(“Buzz!”)
}
}
suspend fun selectFizzBuzz(fizz:ReceiveChannel<String>, buzz:ReceiveChannel<String>){
select<Unit>{
fizz.onReceive{ value ->
println(“fizz -> ‘$value’”)
}
buzz.onReceive{ value ->
println(“buzz -> ‘$value’”)
}
}
}
val fizz = fizz(coroutineContext)
val buzz = buzz(coroutineContext)
repeat(7){
selectFizzBuzz(fizz, buzz)
}
coroutineContext.cancelChildren()
/* 결과는..
fizz -> 'Fizz'
buzz -> 'Buzz!'
fizz -> 'Fizz'
fizz -> 'Fizz'
buzz -> 'Buzz!'
fizz -> 'Fizz'
buzz -> 'Buzz!’
*/
-
onReceive 는 channel 이 close 되거나 안쪽로직에서 Exception 이 나거나 하면 fail 한다.
그래서 이 대신 onReceiveOrNull 을 사용할 수 있다.
이 때는 channel 이 close 되면 value 로 null 값이 들어온다.
-
Select expression 은 onSend clause 를 가지고 있다.
이 녀석은 selection 의 biased 특성과 잘 어울린다.
fun produceNumbers(context:CoroutineContext, side:SendChannel<Int>) = produce<Int>(context){
for(num in 1..10){
delay(100)
select<Unit>{
onSend(num){ } // send to primary channel
side.onSend(num){ } // send to side channel
}
}
}
val side = Channel<Int>()
launch(coroutineContext){
side.consumeEach{ println(“Side channel has $it”) }
}
produceNumbers(coroutineContext, side).consumeEach{
println(“Consuming $it”)
delay(250)
}
println(“Done consuming”)
coroutineContext.cancelChildren()
/* 결과는..
Consuming 1
Side channel has 2
Side channel has 3
Consuming 4
Side channel has 5
Side channel has 6
Consuming 7
Side channel has 8
Side channel has 9
Consuming 10
Done consuming
*/
-
deferred value 는 onAwait clause 를 통해 선택할 수 있다.
fun asyncString(time:Int) = async{
delay(time.toLong())
“Waited for %time ms"
}
fun asyncStringsList():List<Deferred<String>>{
val random = Random(3)
return List(12){ asyncString(random.nextInt(1000)) }
}
val list = asyncStringList()
val result = select<String>{
list.withIndex().forEach{ (index, deferred) ->
deferred.onAwait { answer ->
“Deferred $index produced answer ‘$answer’”
}
}
}
println(result)
val countActive = list.count { it.isActive }
println(“$countActive coroutines are still active”)
/* 결과는.. ( 4, 128, 11 값 모두 상황에 따라 다를 수 있을 것 같다. )
Deferred 4 produced answer 'Waited for 128 ms'
11 coroutines are still active
*/
더보기 접기
\actor, Async, async code 쉽게 짜기, await, Bridging blocking and non-blocking worlds, buffered channel, cancel and join, cancellable, cancellationexception, CommonPool, CompletableDeferred, Composing suspending functions, consumeEach, coroutine builder, coroutine channels, Coroutine dispatcher, coroutine example, coroutine hello world, coroutine parent assign, coroutine tutorial, CoroutineContext, CoroutineName, Coroutines are like daemon threads, CoroutineStart.LAZY, default dispatcher, DefaultDispatcher, deferred return, deffered, delay, Extract function refactoring, ForkJoinPool, hello world coroutine, isActive, Job, job cancel, job timeout, Join, jvm option, kotlin coroutine, kotlinx.coroutines.debug, launch, launch return, launch vs thread, Lazy, light-weight thread, mutex, newSingleThreadContext, NonCancellable, onAwait, Parent, pipeline pattern, produce, producer-consumer pattern, ReentrantLock, runBlocking, Select expression, Shared mutable state and concurrency, sleep, sleep vs delay, start param, suspend function, suspending function, suspending receiver, suspending send, Synchronization, synchronized, thread, Thread confinement, thread pool, thread 지정, TimeoutCancellationException, unbeffered channel, Unconfined, Waiting for a job, withContext, withTimeout, withTimeoutOrNull, yield, [Kotlin] Coroutines tutorial - async code 쉽게 짜기, 미지정, 코루틴, 코루틴 hello world
접기
댓글