태터데스크 관리자

도움말
닫기
적용하기   첫페이지 만들기

태터데스크 메시지

저장하였습니다.
2019.06.11 10:56


[RxJava] #7 디버깅과 예외 처리

Buffer, debounce, doOnComplete, doOnEach, doOnError, doOnLifecycle, doOnNext, doOnTerminate, getError, getvalue, isOnComplete, isOnError, isOnNext, OnDispose, onerrorresumenext, onerrorreturn, onErrorReturnItem, OnSubscribe, retry, retryUntil, retryWhen, rxjava buffer, rxjava debounce, rxjava debug, rxjava debugging, rxjava dooncomplete, rxjava doondispose, rxjava dooneach, rxjava doonerror, rxjava doonlifecycle, rxjava doonnext, rxjava doonsubscribe, rxjava doonterminate, rxjava exception, rxjava onerrorresumenext, rxjava onerrorreturn, rxjava onerrorreturnitem, rxjava retry, rxjava retryuntil, rxjava retrywhen, rxjava sample, rxjava singleclick debounce, rxjava throttlefirst, rxjava throttlelast, rxjava try-catch, rxjava windows, rxjava 디버깅, rxjava 예외 처리, sample, thread-safe, throttleFirst, throttleLast, windows, [RxJava] #7 디버깅과 예외 처리, 흐름 제어

7.1. 디버깅


7.1.1. doOnNext(), doOnComplete(), doOnError() 함수


-

onNext, onError, onComplete 에 대해 doOnNext(), doOnComplete(), doOnError() 가 매핑된다.




7.1.2. doOnEach() 함수


-

doOnEach 는 onNext, onComplete, onError 이벤트를 한번에 처리할 수 있어 편하다.

noti.isOnNext(), noti.isOnComplete(), noti.isOnError() 를 통해 상태를 조회할 수 있다.

onNext() 함수의 경우 getValue() 호출하면 발행한 값을 얻을 수 있고,

onError() 함수의 경우 getError() 함수를 호출하면 Throwable 객체를 얻어올 수 있다.




7.1.3. doOnSubscribe(), doOnDispose(), 기타 함수


-

Observable 의 알림 이벤트 중에는 OnSubscribe 와 OnDispose 이벤트도 있다.

각각 Observable 을 구독했을 때와 구독 해지했을 떄의 이벤트를 처리할 수 있다.

public final Observable<T> doOnSubscribe(Consumer<? super Disposable> onSubscribe)
public final Observable<T> doOnDispose(Action onDispose)


-

doOnDispose 의 Action 객체는 thread-safe 하게 동작해야 한다.



-

doOnSubscribe 와 doOnDispose 함수를 각각 호출하지 않고 한꺼번에 호출하는 함수인 doOnLifecycle() 도 있다.



-

doOnTerminate() 함수는 Observable 이 끝나는 조건인 onComplete 혹은 onError 이벤트가 발생했을 때 실행하는 함수이다.

정확히는 onComplete() 혹은 onError 이벤트 발생 직전에 호출된다.





7.2. 예외 처리


-

try-catch 문은 RxJava 에서 활용할 수 없다.

추가로 함수 체인이나 Observable 내부에서 예외가 발생해도 onError 이벤트가 발생하고 try-catch 문으로는 해결할 수 없다.

즉, RxJava 에서는 다른 방식의 예외 처리 방법을 제공한다.




7.2.1. onErrorReturn() 함수


-

RxJava 에서는 에러도 어떠한 데이터로 보는 것이 적절하다.

따라서 예외 처리하는 첫 번째 방식은 예외가 발생했을 때 에러를 의미하는 다른 데이터로 대체하는 것이다.

onError 이벤트는 데이터 흐름이 바로 중단되므로, subscribe() 함수를 호출할 때 onError 이벤트를 처리하는 것은 Out Of Memory 같은 중대한 에러가 발생했을 때만 활용한다.



-

Observable.fromArray(grades)
    .map(data -> Integer.parseInt(data))
    .onErrorReturn(e -> {
        if(e instanceof NumberFormatException){
            e.printStackTrace();
        }
        return -1;
    })
    .subscribe(data -> {
        if( data < 0 ){
            Log.e(“Wrong Data found!!”);
            return;
        }
        Log.i(“Grade is “ + data);
    });

-

onError 대비 onErrorReturn 함수는 장점이 있다.

1. 예외 발생이 예상되는 부분을 선언적으로 처리할 수 있다.

2. Observable 을 생성하는 측과 구독하는 측이 서로 다를 수 있다.

구독하는 Observable 에서 발생할 수 있는 예외를 구독한 이후에 모두 파악하는 것이 어렵다. Observable 에서는 에러 가능성을 명시하지 않았는데 구독자가 필요한 예외 처리를 빠짐없이 하는 것이 어렵다는 뜻이다. Observable 을 생성하는 측에서 발생할 수 있는 예외 처리를 미리 해두면 구독자는 선언된 예외 상황을 보고 그에 맞는 처리를 할 수 있다.



-

onErrorReturnItem() 도 있다.

onErrorReturn() 함수와 동일하지만 Throwable 객체를 인자로 전달하지 않기 때문에 코드가 더 간결해진다. 단, 예외 종류는 확인할 수 없다.




7.2.2. onErrorResumeNext() 함수


-

onErrorReturn 과 onErrorReturnItem 함수는 에러가 발생한 시점에 특정 값으로 대체하는 것이었다.

onErrorResumeNext 는 에러가 발생했을 때 내가 원하는 Observable 로 대체하는 방법이다.

이는 에러 발생 시 데이터를 교체하는 것뿐만 아니라 관리자에게 이메일을 보내거나 자원을 해제하는 등의 추가 작업을 할 때 유용하다.



-

Observable<Integer> onParseError = Observable.defer( () -> {
    Log.d(“send email to administrator”);
    return Observable.just(-1);
}
.subscribeOn(Schedulers.io());

Observable.fromArray(salesData)
    .map(Integer::parseInt)
    .onErrorResumeNext(onParseError)
    .subscribe(data -> {
        if(data < 0){
            return;
        }
        Log.i(“Sales data : “ + data);
    }



7.2.3. retry() 함수


-

retry() 함수는 Observable 에서 onError 이벤트가 발생하면 바로 다시 subscribe() 함수를 호출하여 재구독하게 되어 있다.

여러가지 오버로딩 함수들이 있어 retry 에 대한 자세한 설정을 할 수 있다.



-

Observable.just(url)
    .map(OkHttpHelper::getT)
    .retry( (retryCnt, e) -> {
        if(retryCnt < RETRY_MAX){
            CommUtils.sleep(RETRY_DELAY);
            return true;
        }
        return false;
    })
    .onErrorReturn(e -> CommUtils.ERROR_CODE)
    .subscribe(Log::i);





7.2.4. retryUntil() 함수


-

특정 조건이 충족할 때까지 재시도하는 함수이다.

Observable.just(url)
    .map(OkHttpHelper::getT)
    .subscribeOn(Schedulers.io())
    .retryUntil( () -> {
        if(CommUtils.isNetworkAvailable()){
            return true; // 인터넷 가능한데도 실패했다면, 중지
        }
        CommUtils.sleep(1000);
        return false; // 인터넷이 불가능했다면, 잠시 후 다시 시도
    } )
    .subscribe(Log::i);



7.2.5. retryWhen() 함수


-

Observable
    .create( (ObservableEmitter<String> emitter) -> {
        emitter.onError(new RuntimeException(“always fails”));
    }
    .retryWhen( Observable attempts -> {
        return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
            return Observable.timer(i, TimeUnit.SECONDS);
        }
    }
    .blockingForEach(Log::d);

// 3번 delay 재시도를 한다.




7.3. 흐름 제어


-

흐름 제어는 Observable 이 데이터를 발행하는 속도와 옵저버가 데이터를 받아서 처리하는 속도 사이의 차이가 발생할 때 사용하는 함수이다.




7.3.1. sample() 함수


-

특정한 시간 동안 가장 최근에 발행된 데이터만 걸러준다.

해당 시간에는 아무리 많은 데이터가 들어와도 해당 구간의 마지막 데이터만 발행하고 나머지는 무시한다.



-

String[] data = {"1", "7", "2", "3", "6"};

Observable<String> earlySource
= Observable.fromArray(data)
    .take(4)
    .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a);

Observable<String> lateSource
= Observable.just(data[4])
    .zipWith(Observable.timer(300L, TimeUnit.MILLISECONDS), (a, b) -> a);

Observable.concat(earlySource, lateSource)
    .sample(300L, TimeUnit.MILLISECONDS) // 마지막 인자(emitLast)에 true 를 주면 최종값도 사용한다.
    .subscribe(Log::it);

결과는..

RxComputationThreadPool-1 | 543 | value = 7

RxComputationThreadPool-1 | 843 | value = 3




7.3.2. buffer() 함수


-

sample() 함수는 특정 시간 간격을 기준으로 가장 최근에 발행된 데이터만 넘겨주고 나머지는 무시하는 반면 buffer() 함수는 일정 시간 동안 데이터를 모아두었다가 한꺼번에 발행해준다.



-

String[] data = {"1", "2", "3", "4", "5", "6"};

Observable<String> earlySource
= Observable.fromArray(data)
    .take(3)
    .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a);

Observable<String> middleSource
= Observable.just(data[3])
    .zipWith(Observable.timer(300L, TimeUnit.MILLISECONDS), (a, b) -> a);

Observable<String> lateSource
= Observable.just(data[4], data[5])
    .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a);

Observable.concat(earlySource, middleSource, lateSource)
    .buffer(3) // 3개씩 전달한다.
    .subscribe(Log::it);

결과는…

RxComputationThreadPool-1 | 506 | value = [1, 2, 3]

RxComputationThreadPool-3 | 1013 | value = [4, 5, 6]



-

buffer 의 두번째 인자는 skip 인데.. 첫번째 인자인 count 값보다 커야 한다.

예를 들어 (2,3) 으로 전달하면 2개를 받고, 3번째 녀석은 무시한다.






7.3.3. throttleFirst() 와 throttleLast() 함수


-

throttleFirst() 는 주어진 조건에서 가장 먼저 입력된 값을 발행한다.

throttleLast() 는 주어진 조건에서 가장 마지막에 입력된 값을 발행한다.

윈도우 shifting 하면서 첫번째 혹은 마지막 값을 전달한다고 보면 된다.



-

String[] data = {"1", "2", "3", "4", "5", "6"};

Observable<String> earlySource 
= Observable.just(data[0])
    .zipWith(Observable.timer(100L, TimeUnit.MILLISECONDS), (a,b) -> a);

Observable<String> middleSource
= Observable.just(data[1])
    .zipWith(Observable.timer(300L, TimeUnit.MILLISECONDS), (a,b) -> a);

Observable<String> lateSource 
= Observable.just(data[2], data[3], data[4], data[5])
    .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a);

Observable.concat(earlySource, middleSource, lateSource)
    .throttleFirst(200L, TimeUnit.MILLISECONDS)
    .subscribe(Log::it);

결과는..

RxComputationThreadPool-1 | 305 | value = 1

RxComputationThreadPool-3 | 609 | value = 2

RxComputationThreadPool-4 | 305 | value = 4

RxComputationThreadPool-4 | 305 | value = 6




7.3.4. windows() 함수


-

groupBy() 함수와 개념적으로 비슷하다.

groupBy() 함수는 특정 조건에 맞는 입력값들을 그룹화해 별도의 Observable 을 병렬로 만든다.

반면 window() 함수는 throttleFirst() 나 sample() 함수처럼 내가 처리할 수 있는 일부의 값들만 받아들일 수 있다.



-

@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<Observable<T>> window(long count)

@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Observable<Observable<T>> window(long timespan, long timeskip, TimeUnit unit)


-

String[] data = {"1", "2", "3", "4", "5", "6"};

Observable<String> earlySource 
= Observable.fromArray(data)
    .take(3)
    .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a,b) -> a);

Observable<String> middleSource
= Observable.just(data[3])
    .zipWith(Observable.timer(300L, TimeUnit.MILLISECONDS), (a,b) -> a);

Observable<String> lateSource 
= Observable.just(data[4], data[5])
    .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a);

Observable.concat(earlySource, middleSource, lateSource)
    .window(3)
    .subscribe( observable -> {
        Log.d(“New Observable Started!!”);
        observable.subscribe(Log::it);
    });

결과는…

RxComputationThreadPool-1 | 314 | debug = New Observable Started!!

RxComputationThreadPool-1 | 316 | value = 1

RxComputationThreadPool-1 | 398 | value = 2

RxComputationThreadPool-1 | 497 | value = 3

RxComputationThreadPool-1 | 802 | debug = New Observable Started!!

RxComputationThreadPool-1 | 802 | value = 4

RxComputationThreadPool-1 | 903 | value = 5

RxComputationThreadPool-1 | 1004 | value = 6




7.3.5. debounce() 함수


-

빠르게 연속 이벤트를 처리하는 흐름 제어 함수이다.

@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Observable<T> debounce(long timeout, TimeUnit unit)

timeout 에서 지정한 시간 동안 "추가 이벤트가 발생하지 않으면" 마지막 이벤트를 최종적으로 발행한다. (돼왕 : 추가 이벤트가 발생하지 않으면이 throttle 함수와의 차이점이다.)

UI 환경에서 클릭 이벤트가 동시에 여러개 들어올 경우와 같은 케이스에 마지막 이벤트만 처리하도록 하는 데 유용하다.





7.4. 마치며




댓글을 달아 주세요


Posted by 돼지왕왕돼지