7.1. 디버깅
7.1.1. doOnNext(), doOnComplete(), doOnError() 함수
-
onNext, onError, onComplete 에 대해 doOnNext(), doOnComplete(), doOnError() 가 매핑된다.
7.1.2. doOnEach() 함수
-
doOnEach 는 onNext, onComplete, onError 이벤트를 한번에 처리할 수 있어 편하다.
noti.isOnNext(), noti.isOnComplete(), noti.isOnError() 를 통해 상태를 조회할 수 있다.
onNext() 함수의 경우 getValue() 호출하면 발행한 값을 얻을 수 있고,
onError() 함수의 경우 getError() 함수를 호출하면 Throwable 객체를 얻어올 수 있다.
7.1.3. doOnSubscribe(), doOnDispose(), 기타 함수
-
Observable 의 알림 이벤트 중에는 OnSubscribe 와 OnDispose 이벤트도 있다.
각각 Observable 을 구독했을 때와 구독 해지했을 떄의 이벤트를 처리할 수 있다.
public final Observable<T> doOnSubscribe(Consumer<? super Disposable> onSubscribe) public final Observable<T> doOnDispose(Action onDispose)
-
doOnDispose 의 Action 객체는 thread-safe 하게 동작해야 한다.
-
doOnSubscribe 와 doOnDispose 함수를 각각 호출하지 않고 한꺼번에 호출하는 함수인 doOnLifecycle() 도 있다.
-
doOnTerminate() 함수는 Observable 이 끝나는 조건인 onComplete 혹은 onError 이벤트가 발생했을 때 실행하는 함수이다.
정확히는 onComplete() 혹은 onError 이벤트 발생 직전에 호출된다.
7.2. 예외 처리
-
try-catch 문은 RxJava 에서 활용할 수 없다.
추가로 함수 체인이나 Observable 내부에서 예외가 발생해도 onError 이벤트가 발생하고 try-catch 문으로는 해결할 수 없다.
즉, RxJava 에서는 다른 방식의 예외 처리 방법을 제공한다.
7.2.1. onErrorReturn() 함수
-
RxJava 에서는 에러도 어떠한 데이터로 보는 것이 적절하다.
따라서 예외 처리하는 첫 번째 방식은 예외가 발생했을 때 에러를 의미하는 다른 데이터로 대체하는 것이다.
onError 이벤트는 데이터 흐름이 바로 중단되므로, subscribe() 함수를 호출할 때 onError 이벤트를 처리하는 것은 Out Of Memory 같은 중대한 에러가 발생했을 때만 활용한다.
-
Observable.fromArray(grades) .map(data -> Integer.parseInt(data)) .onErrorReturn(e -> { if(e instanceof NumberFormatException){ e.printStackTrace(); } return -1; }) .subscribe(data -> { if( data < 0 ){ Log.e(“Wrong Data found!!”); return; } Log.i(“Grade is “ + data); });
-
onError 대비 onErrorReturn 함수는 장점이 있다.
1. 예외 발생이 예상되는 부분을 선언적으로 처리할 수 있다.
2. Observable 을 생성하는 측과 구독하는 측이 서로 다를 수 있다.
구독하는 Observable 에서 발생할 수 있는 예외를 구독한 이후에 모두 파악하는 것이 어렵다. Observable 에서는 에러 가능성을 명시하지 않았는데 구독자가 필요한 예외 처리를 빠짐없이 하는 것이 어렵다는 뜻이다. Observable 을 생성하는 측에서 발생할 수 있는 예외 처리를 미리 해두면 구독자는 선언된 예외 상황을 보고 그에 맞는 처리를 할 수 있다.
-
onErrorReturnItem() 도 있다.
onErrorReturn() 함수와 동일하지만 Throwable 객체를 인자로 전달하지 않기 때문에 코드가 더 간결해진다. 단, 예외 종류는 확인할 수 없다.
7.2.2. onErrorResumeNext() 함수
-
onErrorReturn 과 onErrorReturnItem 함수는 에러가 발생한 시점에 특정 값으로 대체하는 것이었다.
onErrorResumeNext 는 에러가 발생했을 때 내가 원하는 Observable 로 대체하는 방법이다.
이는 에러 발생 시 데이터를 교체하는 것뿐만 아니라 관리자에게 이메일을 보내거나 자원을 해제하는 등의 추가 작업을 할 때 유용하다.
-
Observable<Integer> onParseError = Observable.defer( () -> { Log.d(“send email to administrator”); return Observable.just(-1); } .subscribeOn(Schedulers.io()); Observable.fromArray(salesData) .map(Integer::parseInt) .onErrorResumeNext(onParseError) .subscribe(data -> { if(data < 0){ return; } Log.i(“Sales data : “ + data); }
7.2.3. retry() 함수
-
retry() 함수는 Observable 에서 onError 이벤트가 발생하면 바로 다시 subscribe() 함수를 호출하여 재구독하게 되어 있다.
여러가지 오버로딩 함수들이 있어 retry 에 대한 자세한 설정을 할 수 있다.
-
Observable.just(url) .map(OkHttpHelper::getT) .retry( (retryCnt, e) -> { if(retryCnt < RETRY_MAX){ CommUtils.sleep(RETRY_DELAY); return true; } return false; }) .onErrorReturn(e -> CommUtils.ERROR_CODE) .subscribe(Log::i);
7.2.4. retryUntil() 함수
-
특정 조건이 충족할 때까지 재시도하는 함수이다.
Observable.just(url) .map(OkHttpHelper::getT) .subscribeOn(Schedulers.io()) .retryUntil( () -> { if(CommUtils.isNetworkAvailable()){ return true; // 인터넷 가능한데도 실패했다면, 중지 } CommUtils.sleep(1000); return false; // 인터넷이 불가능했다면, 잠시 후 다시 시도 } ) .subscribe(Log::i);
7.2.5. retryWhen() 함수
-
Observable .create( (ObservableEmitter<String> emitter) -> { emitter.onError(new RuntimeException(“always fails”)); } .retryWhen( Observable attempts -> { return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> { return Observable.timer(i, TimeUnit.SECONDS); } } .blockingForEach(Log::d); // 3번 delay 재시도를 한다.
7.3. 흐름 제어
-
흐름 제어는 Observable 이 데이터를 발행하는 속도와 옵저버가 데이터를 받아서 처리하는 속도 사이의 차이가 발생할 때 사용하는 함수이다.
7.3.1. sample() 함수
-
특정한 시간 동안 가장 최근에 발행된 데이터만 걸러준다.
해당 시간에는 아무리 많은 데이터가 들어와도 해당 구간의 마지막 데이터만 발행하고 나머지는 무시한다.
-
String[] data = {"1", "7", "2", "3", "6"}; Observable<String> earlySource = Observable.fromArray(data) .take(4) .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a); Observable<String> lateSource = Observable.just(data[4]) .zipWith(Observable.timer(300L, TimeUnit.MILLISECONDS), (a, b) -> a); Observable.concat(earlySource, lateSource) .sample(300L, TimeUnit.MILLISECONDS) // 마지막 인자(emitLast)에 true 를 주면 최종값도 사용한다. .subscribe(Log::it);
결과는..
RxComputationThreadPool-1 | 543 | value = 7
RxComputationThreadPool-1 | 843 | value = 3
7.3.2. buffer() 함수
-
sample() 함수는 특정 시간 간격을 기준으로 가장 최근에 발행된 데이터만 넘겨주고 나머지는 무시하는 반면 buffer() 함수는 일정 시간 동안 데이터를 모아두었다가 한꺼번에 발행해준다.
-
String[] data = {"1", "2", "3", "4", "5", "6"}; Observable<String> earlySource = Observable.fromArray(data) .take(3) .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a); Observable<String> middleSource = Observable.just(data[3]) .zipWith(Observable.timer(300L, TimeUnit.MILLISECONDS), (a, b) -> a); Observable<String> lateSource = Observable.just(data[4], data[5]) .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a); Observable.concat(earlySource, middleSource, lateSource) .buffer(3) // 3개씩 전달한다. .subscribe(Log::it);
결과는…
RxComputationThreadPool-1 | 506 | value = [1, 2, 3]
RxComputationThreadPool-3 | 1013 | value = [4, 5, 6]
-
buffer 의 두번째 인자는 skip 인데.. 첫번째 인자인 count 값보다 커야 한다.
예를 들어 (2,3) 으로 전달하면 2개를 받고, 3번째 녀석은 무시한다.
7.3.3. throttleFirst() 와 throttleLast() 함수
-
throttleFirst() 는 주어진 조건에서 가장 먼저 입력된 값을 발행한다.
throttleLast() 는 주어진 조건에서 가장 마지막에 입력된 값을 발행한다.
윈도우 shifting 하면서 첫번째 혹은 마지막 값을 전달한다고 보면 된다.
-
String[] data = {"1", "2", "3", "4", "5", "6"}; Observable<String> earlySource = Observable.just(data[0]) .zipWith(Observable.timer(100L, TimeUnit.MILLISECONDS), (a,b) -> a); Observable<String> middleSource = Observable.just(data[1]) .zipWith(Observable.timer(300L, TimeUnit.MILLISECONDS), (a,b) -> a); Observable<String> lateSource = Observable.just(data[2], data[3], data[4], data[5]) .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a); Observable.concat(earlySource, middleSource, lateSource) .throttleFirst(200L, TimeUnit.MILLISECONDS) .subscribe(Log::it);
결과는..
RxComputationThreadPool-1 | 305 | value = 1
RxComputationThreadPool-3 | 609 | value = 2
RxComputationThreadPool-4 | 305 | value = 4
RxComputationThreadPool-4 | 305 | value = 6
7.3.4. windows() 함수
-
groupBy() 함수와 개념적으로 비슷하다.
groupBy() 함수는 특정 조건에 맞는 입력값들을 그룹화해 별도의 Observable 을 병렬로 만든다.
반면 window() 함수는 throttleFirst() 나 sample() 함수처럼 내가 처리할 수 있는 일부의 값들만 받아들일 수 있다.
-
@SchedulerSupport(SchedulerSupport.NONE) public final Observable<Observable<T>> window(long count) @SchedulerSupport(SchedulerSupport.COMPUTATION) public final Observable<Observable<T>> window(long timespan, long timeskip, TimeUnit unit)
-
String[] data = {"1", "2", "3", "4", "5", "6"}; Observable<String> earlySource = Observable.fromArray(data) .take(3) .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a,b) -> a); Observable<String> middleSource = Observable.just(data[3]) .zipWith(Observable.timer(300L, TimeUnit.MILLISECONDS), (a,b) -> a); Observable<String> lateSource = Observable.just(data[4], data[5]) .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a); Observable.concat(earlySource, middleSource, lateSource) .window(3) .subscribe( observable -> { Log.d(“New Observable Started!!”); observable.subscribe(Log::it); });
결과는…
RxComputationThreadPool-1 | 314 | debug = New Observable Started!!
RxComputationThreadPool-1 | 316 | value = 1
RxComputationThreadPool-1 | 398 | value = 2
RxComputationThreadPool-1 | 497 | value = 3
RxComputationThreadPool-1 | 802 | debug = New Observable Started!!
RxComputationThreadPool-1 | 802 | value = 4
RxComputationThreadPool-1 | 903 | value = 5
RxComputationThreadPool-1 | 1004 | value = 6
7.3.5. debounce() 함수
-
빠르게 연속 이벤트를 처리하는 흐름 제어 함수이다.
@SchedulerSupport(SchedulerSupport.COMPUTATION) public final Observable<T> debounce(long timeout, TimeUnit unit)
timeout 에서 지정한 시간 동안 "추가 이벤트가 발생하지 않으면" 마지막 이벤트를 최종적으로 발행한다. (돼왕 : 추가 이벤트가 발생하지 않으면이 throttle 함수와의 차이점이다.)
UI 환경에서 클릭 이벤트가 동시에 여러개 들어올 경우와 같은 케이스에 마지막 이벤트만 처리하도록 하는 데 유용하다.
7.4. 마치며
'프로그래밍 놀이터 > 안드로이드, Java' 카테고리의 다른 글
[android] File-Based Encryption & Direct Boot mode (0) | 2019.07.08 |
---|---|
[RxJava] #8 테스팅과 Flowable (0) | 2019.06.12 |
[RxJava] #6 안드로이드의 RxJava 활용 (0) | 2019.06.10 |
[RxJava] #5 스케줄러 (0) | 2019.06.07 |
[RxJava] #4 리액티브 연산자의 활용 (0) | 2019.06.05 |
댓글