태터데스크 관리자

도움말
닫기
적용하기   첫페이지 만들기

태터데스크 메시지

저장하였습니다.
2019.06.07 08:03


[RxJava] #5 스케줄러


callback hell, cold observable, Computation, connectableobservable, declarative concurrency, default thread, exjava refcount, factory method, hot observable, Immediate, IO, io scheduler, multiple subscribeon, newthread, only subscribeon, Publish, recommended scheduler, refCount, rxjava observeon, rxjava publish, rxjava scheduler, rxjava share, rxjava share 이해, rxjava subscribeon, rxjava 스케줄러, rxjava 스케줄러 종류, scheduler thread count, scheduler with executor, schedulersupport.custom, share, Single, single thread scheduler, subscribeon vs observeon, test, trampolin shceduler, trampoline, [RxJava] #5 스케줄러, 선언적 비동기, 스케줄러, 콜백 지옥, 트램펄린 스케줄러


5.1. 스케줄러 개념 배우기


-

스케줄러는 스레드를 지정할 수 있게 해준다.

단순히 새로운 스레드를 생성하거나 기존의 Executors 를 활용하는 것을 넘어 새로운 방식으로 우리를 맞이한다.

그동안 어렵게 다루어야 했던 비동기 프로그래밍이 간결한 코드로 다시 태어난다.



-

String[] objs = {“1-S”, “2-T”, “3-P”};
Observable<String> source = Observable.fromArray(objs)
    .doOnNext(data -> Log.v(“Original data = “ + data))
    .subscribeOn(Schedulers.newThread()) // 구독 thread 지정
    .observeOn(Schedulers.newThread()) // 데이터 흐름이 처리될 때 스레드 지정
    .map(Shape::flip);
source.subscribe(Log::i);

결과는..

RxNewThreadScheduler-1 | Original data = 1-S

RxNewThreadScheduler-1 | Original data = 2-T

RxNewThreadScheduler-1 | Original data = 3-P

RxNewThreadScheduler-2 | value = (flipped) 1-S

RxNewThreadScheduler-2 | value = (flipped) 2-T

RxNewThreadScheduler-2 | value = (flipped) 3-P



-

1. 스케줄러는 RxJava 코드를 어느 스레드에서 실행할지 지정할 수 있다.

2. subscribeOn() 함수와 observeOn() 함수를 모두 지정하면 Observable 에서 데이터 흐름이 발생하는 스레드와 처리된 결과를 구독자에게 발행하는 스레드를 분리할 수 있다.

3. subscribeOn() 함수만 호출하면 Observable 의 모든 흐름이 동일한 스레드에서 실행된다.

4. 스케줄러를 별도로 지정하지 않으면 현재(main) 스레드에서 동작을 실행한다.



-

observeOn 은 호출하는 순간 이후의 로직을 해당 thread 로 바꾸고,

subscribeOn 은 발행로직부터 subscribe 함수 까지의 thread 를 바꾼다.

subscribeOn 은 여러번 호출되어도 최초 호출에 대해서만 valid 하다.

observeOn 와 subscribeOn 이 함께 쓰이면.. observeOn 이 만날 때까지는 subscribeOn 에서 지정한 thread 에서 수행되고, 그 이후부터는 observeOn 을 수행한 thread 에서 수행한다.


다시 정리하자면, subscribeOn 은 발행하는 로직부터의 thread 를 지정하는 것이고,

observeOn 은 그것이 불린 순간부터의 thread 전환을 야기한다고 보면 된다.


callback hell, cold observable, Computation, connectableobservable, declarative concurrency, default thread, exjava refcount, factory method, hot observable, Immediate, IO, io scheduler, multiple subscribeon, newthread, only subscribeon, Publish, recommended scheduler, refCount, rxjava observeon, rxjava publish, rxjava scheduler, rxjava share, rxjava share 이해, rxjava subscribeon, rxjava 스케줄러, rxjava 스케줄러 종류, scheduler thread count, scheduler with executor, schedulersupport.custom, share, Single, single thread scheduler, subscribeon vs observeon, test, trampolin shceduler, trampoline, [RxJava] #5 스케줄러, 선언적 비동기, 스케줄러, 콜백 지옥, 트램펄린 스케줄러





5.2. 스케줄러의 종류


-

RxJava 의 멋진 점은 특정 스케줄러를 사용하다가 다른 스케줄러로 변경하기가 쉽다는 것.



-

Schedulers class 의 factory method 들


뉴 스레드 스케줄러 : newThread()

싱글 스레드 스케줄러 : single() ( 1.x 지원 안 함 )

계산 스케줄러 : computation()

IO 스케줄러 : io()

트램펄린 스케줄러 : trampoline()

메인 스레드 스케줄러 : X ( 1.x 에서는 immediate() )

테스트 스케줄러 : X ( 1.x 에서는 test() )




5.2.1. 뉴 스레드 스케줄러




5.2.2. 계산 스케줄러


-

RxJava 에서 추천하는 스케줄러는 크게 세 가지.

첫 번째는 계산(Computation) 스케줄러,

두번째는 IO 스케줄러,

그리고 트램펄린 스케줄러


-

@SchedulerSupport(SchedulerSupport.CUSTOM)
public static Observable<Long> interval(long period, TimeUnit unit, Scheduler scheduler)

위와 같이 scheduler 를 지정할 수 있는 함수들이 있고,

그 경우는 annotation 으로 CUSTOM 이 지정된다.



-

계산 스케줄러는 입출력하지 않는 스케줄러라고 생각하면 된다.

내부적으로 스레드 풀을 생성하며 스레드 개수는 기본적으로 프로세서 개수와 동일하다.






5.2.3. IO 스케줄러


-

네트워크 요청을 처리하거나, 각종 입,출력 작업, DB 작업을 실행하기 위한 스케줄러이다.

계산 스케줄러와 다른 점은 기본으로 생성되는 스레드 개수가 다르다는 것.

IO 스케줄러는 필요할 때마다 스레드를 계속 생성한다.




5.2.4. 트램펄린 스케줄러


-

새로운 스레드를 생성하지 않고 현재 스레드에 무한한 크기의 대기 행렬(Queue)를 생성하는 스케줄러.

작업의 수행은 main 스레드에서 한다.




5.2.5. 싱글 스레드 스케줄러


-

단일 스레드를 별도로 생성하여 subscription 작업을 처리한다.

생성된 스레드는 여러 번 구독 요청이 와도 공통으로 사용된다.




5.2.6. Executor 변환 스케줄러


-

Executor 를 사용해서 스케줄러를 지정할 수도 있지만, 추천하는 방법은 아니다.

Schedulers.from(executor)) 를 통해 scheduler 를 얻어올 수 있다.





5.3. 스케줄러를 활용하여 콜백 지옥 벗어나기


-

Observable.just(FIRST_URL)
    .subscribeOn(Schedulers.io())
    .map(OkHttpHelper::get)
    .concatWith(
        Observable.just(SECOND_URL)
            .map(OkHttpHelper::get))
    .subscribe(Log::it);




5.4. observeOn() 함수의 활용


-

subscribeOn() 함수는 한번 호출했을 때 결정한 스레드를 고정하며 이후에는 다시 호출해도 스레드가 바뀌지 않는다.

observeOn() 은 여러 번 호출할 수 있으며 호출되면 그다음부터 동작하는 스레드를 바꿀 수 있다.



-

ConnectableObservable 클래스의 publish() 함수와 refCount() 함수를 합친 share() 함수를 사용하면, 중간과정을 share 할 수 있다.

Observable<String> source = Observable.just(URL + API_KEY)
    .map(OkHttpHelper::getWithLog)
    .subscribeOn(Schedulers.io())
    .share()
    .observeOn(Scheduler.newThread());

source.map(this::parseTemperature).subscribe(Log::it);
source.map(this::parseCityName).subscribe(Log::it);

share() 를 사용하면서 Hot observable 이 되고, subscribe 는 io 가 발생한 이후의 값을 공유한다.

돼왕 : 아래 돼왕 설명을 읽고 오면, 위 코드의 묘미를 알 수 있다.

share 를 사용하기 전 cold observable 이라면 subscribe 를 할 때마다 새롭게 발행이 수행된다. 즉 network 통신 결과값을 공유하는 것이 아니라 subscribe 순간마다 새로운 network 를 요청한다. 

그러나, share 를 호출함으로써 subscribe 된 이후에 발행이 시작된다면, 하나의 결과값을 두개의 observer 가 동시에 받을 수 있다.

그리고 혹여나 첫번째 subscribe 가 불리고 두번째 subscribe 가 불리기 전에 결과값이 도착했다면, 두번째 subscribe 는 새롭게 network 를 요청해서 결과값을 받아오게 된다. 



-

돼왕 : publish 는 cold observable -> hot observable 로 바꾸는 함수라고 이전에 배웠다. publish 의 결과물은 ConnectableObservable 이며, 이는 subscribe 로 발행되지 않고, connect 함수로 발행된다.


refCount 는 document 상으로는 ConnectableObservable 이 일반 observable 처럼 작동하게 하는 녀석이라고 한다. subscribe 를 하면 connect 를 호출함으로써 refCount 호출 이후에는 subscribe 를 하면서 바로 발행이 시작된다. 

ConnectableObservable 이 일반 observable 처럼 작동하게 한다는 의미는.. 더이상 구독하는 Observer 가 없을 때 자동으로 자신을 해지를 하고, 다시 새로운 Observer 가 오면 처음부터 다시 발행을 한다는 것이다.

(비슷한 것으로 autoConnect 가 있는데, 자동 해지는 동일하지만, 새로운 observer 가 와도 새로 시작하지 않는다.)


share 는 publish + refCount 와 같다고 한다.

즉 publish 로 ConnectableObservable 이 된 녀석을 refCount 로 일반 Observable 처럼 쓰게 하는 것이 share 의 역할이다.


callback hell, cold observable, Computation, connectableobservable, declarative concurrency, default thread, exjava refcount, factory method, hot observable, Immediate, IO, io scheduler, multiple subscribeon, newthread, only subscribeon, Publish, recommended scheduler, refCount, rxjava observeon, rxjava publish, rxjava scheduler, rxjava share, rxjava share 이해, rxjava subscribeon, rxjava 스케줄러, rxjava 스케줄러 종류, scheduler thread count, scheduler with executor, schedulersupport.custom, share, Single, single thread scheduler, subscribeon vs observeon, test, trampolin shceduler, trampoline, [RxJava] #5 스케줄러, 선언적 비동기, 스케줄러, 콜백 지옥, 트램펄린 스케줄러



share 의 예제는 아래 블로그를 참조하자.

https://javaexpert.tistory.com/797




5.5. 마치며


-

스케줄러는 ‘선언적 비동기(Declarative Concurrency)’ 로 프로그래밍 할 수 있도록 해준다

RxJava 에서 제공하는 IO, 계산, 트램펄린 스케줄러를 활용하면 개별적으로 스레드를 만들지 않고도 원하는 비동기 프로그래밍을 작성할 수 있다.




댓글을 달아 주세요


Posted by 돼지왕왕돼지