태터데스크 관리자

도움말
닫기
적용하기   첫페이지 만들기

태터데스크 메시지

저장하였습니다.
2019.06.12 11:54


[RxJava] #8 테스팅과 Flowable


assertComplete, assertEquals, assertFailure, assertFailureAndMessage, assertResult, awaitDone, debounce, flowable, flowable vs sample throttle debounce, juni5, juni5 modules, junit jupiter, junit vintage, Jupiter, MissingBackpressureException, observable vs flowable, onBackpressureBuffer, onBackpressureDrop, onBackpressureLatest, rxjava awaitdone, rxjava backpressure, rxjava flowable, rxjava onBackpressureBuffer, rxjava onBackpressureDrop, rxjava onBackpressureLatest, rxjava test, rxjava testing, rxjava tet, rxjava 배압, rxjava 비동기 코드 테스트, sample, TestObserver, Throttle, toFlowable, toObservable, txjava flowable, txjava junit, Vintage, when to use flowable, [RxJava] #8 테스팅과 Flowable


8.1. JUnit 5 활용


-

JUnit 5 의 가장 큰 특징은 모듈화.

JUnit 4 까지는 단일 jar 파일로 구성되었다.



-

JUnit 5 = JUnit Platform + JUnit Jupiter + JUnit Vintage


처음 JUnit 5 를 활용하는 개발자는 Jupiter 만 참조하면 된다, Jupiter 는 내부적으로 JUnit Platform 을 의존한다.

JUnit Vintage 는 JUnit 3 혹은 JUnit 4 에서 작성한 테스트 코드를 JUnit 5 로 실행할 때 필요하다.



-

다음과 같이 gradle 에 설정이 필요하다.

// Version 은 알아서 최신으로!
testCompile ‘org.junit.jupiter:junit-jupiter-api:5.0.0-RC2’

testRuntime ‘org.junit.jupiter:junit-jupiter-engine:5.0.0-RC2’

testCompile ‘org.junit.platform:junit-platform-runner:1.0.0-RC2’



-

@RunWith(JUnitPlatform.class)
public class JUnit5Basic{
    @DisplayName(“JUnit 5 First Example”)
    @Test
    void testFirst(){
        assertEquals(expected, actual);
    }
}



8.2. TestObserver 클래스


-

TestObserver 클래스의 주요 함수는 아래와 같다.


assertResult : assertEquals 와 같다.

assertFailure : 기대했던 에러(onError 이벤트)가 발생하는지 확인하는 코드

assertFailureAndMessage

awaitDone : interval() 함수처럼 비동기로 동작하는 Observable 코드를 테스트할 수 있다.

assertComplete : Observable 을 정상적으로 완료했는지 확인한다.



-

Observable.fromArray(data)
    .map(Integer::parseInt)
    .test()
    .assertFailure(NumberFormatException.class, 100, 200);



8.3. 비동기 코드 테스트


-

Observable.interval(100L, TimeUnit.MILLISECONDS)
    .take(5)
    .map(Long::intValue)
    .doOnNext(Log::d)
    .test()
    .assertResult(0, 1, 2, 3, 4);

위 코드는 실패한다.

이유는 interval 함수가 main 스레드가 아닌 계산 스케줄러에서 실행되기 때문이다.



-

위의 코드에서 아래 구문을 넣어주면 성공한다.

.awaitDone(1L, TimeUnit.SECONDS)






8.4. Flowable 클래스


-

Flowable 은 RxJava 2.x 에 새로 도입된 클래스이다.

Flowable 은 배압(backpressure) 이슈를 위해 별도 분리한 클래스이다.

기존의 RxJava 1.x 에는 Observable 배압 관련된 함수들을 포함했었다.


Flowable 클래스를 도입한 이유는 Observable 클래스의 성능을 향상시키기 위해서이다.

기존의 Observable 클래스는 배압에 관한 처리가 불필요한 경우에는 초기 로딩 때문에 약간의 오버헤드가 있었다.

RxJava2 의 Observable 클래스에는 배압으로 인한 성능 오버헤드가 사라졌다.



-

Flowable 클래스의 활용은 기본적으로 Observable 과 동일하다.

또한 Flowable 에서 Observable 로 상호 변환도 어렵지 않다.

변환을 위해 toObservable() 과 toFlowable() 함수를 제공한다.



-

Flowable.just(“Hello world”)
    .subscribe(System.out:println)


8.4.1. Observable 과 Flowable 의 선택 기준


-

Observable 을 사용해야 할 때는 아래와 같다.


1. 최대 1,000개 미만의 데이터 흐름으로 OOM 이 발생할 확률이 거의 없는 경우

2. 마우스 이벤트나 터치 이벤트를 다루는 GUI 프로그래밍. 배압 문제가 거의 발생하지 않는다.

3. 데이터 흐름이 본질적으로 동기 방식이지만 프로젝트에서 사용하는 플랫폼이 자바 Stream API 나 그에 준하는 기능을 제공하지 않을 때.



-

Flowable 을 선택해야 할 때는 다음과 같다.


1. 특정 방식으로 생성된 10,000 개 이상의 데이터를 처리하는 경우. 메서드 체인에서 데이터 소스에 데이터 개수 제한을 요청해야 한다.

2. 디스크에서 파일을 읽어 들일 경우. 본질적으로 블로킹 I/O 방식을 활용하고 내가 원하는 만큼 가져오는 방식(pull-based)으로 처리해야 하기 때문이다.

3. JDBC 를 활용해 데이터베이스의 쿼리 결과를 가져오는 경우, 블로킹 방식을 이용하므로 ResultSet.next() 를 호출하는 방식으로 쿼리의 결과를 읽어오도록 제어할 수 있다.

4. 네트워크 I/O 를 실행하는 경우. 네트워크나 프로토콜을 통해 서버에서 가져오길 원하는 만큼의 데이터양을 요청할 수 있을 때

5. 다수의 블로킹 방식을 사용하거나 가져오는 방식(pull-based)의 데이터 소스가 미래에는 논 블로킹(non-blocking) 방식의 리액티브 API 나 드라이버를 제공할 수 있는 경우.



-

데이터 발행과 처리 속도가 차이 나더라도 먼저 sample(), throttle(), debounce() 같은 흐름 제어 함수를 활용하여 해결하는 것이 좋다.

이러한 함수로 해결하기 어려울 때 Flowable 클래스로 전환하면 된다.




8.4.2. Flowable 을 활용한 배압 이슈 대응


-

Flowable 에서 제공하는 배압 이슈에 대응하는 함수는 다음과 같다.


onBackpressureBuffer() : 배압 이슈가 발생했을 때 별도의 버퍼에 저장한다. 기본적으로 128개의 버퍼가 있다.

onBackpressureDrop() : 배압 이슈가 발생했을 때 해당 데이터를 무시한다.

onBackpressureLatest() : 처리할 수 없어서 쌓이는 데이터를 무시하면서 최신 데이터만 유지한다.



-

PublishSubject<Integer> subject = PublishSubject.create()
    .observeOn(Schedulers.computation())
    .subscribe(data -> {
        CommUtils.sleep(100); // 100ms 후 데이터 처리
        Log.it(data);
    }, err -> {
        Log.e(err.toString());
    });

for(int i=0; i < 50_000_000; i++){
    subject.onNext(i);
}
subject.onComplete();

돼왕 : 위 예제를 돌리면 backpressure 가 발생하면서 MissingBackpressureException 이 던져진다.


-

public final Flowable<T> onBackpressureBuffer()

    기본값 128개의 버퍼 개수를 가진다.


public final Flowable<T> onBackpressureBuffer(boolean delayError)

    true 면 예외가 발생했을 때 버퍼에 쌓인 데이터를 모두 처리할 때까지 예외를 던지지 않는다.

    false 면 예외가 발생했을 때 바로 다운스트림에 예외를 던진다. 기본값은 false 이다.


public final Flowable<T> onBackpressureBuffer(int capacity, Action onOverflow)

    capacity 인자로 버퍼 개수를 지정.

    onOverflow 인자에 버퍼가 넘쳤을 때 실행할 동작을 지정한다.


public final Flowable<T> onBackpressureBuffer(long capacity, Action onOverflow, BackpressureOverflowStrategy overflowStrategy)

    버퍼가 가득 찼을 때 추가로 실행하는 전략을 지정할 수 있다.

    전략은 아래와 같은 것들이 있다.

        ERROR : MissingBackpressureException 예외를 던지고 데이터 흐름을 중단한다.

        DROP_LATEST : 버퍼에 쌓여 있는 최근 값을 제거한다.

        DROP_OLDEST : 버퍼에 쌓여 있는 가장 오래된 값을 제거한다.



-

onBackpressureBuffer() 함수가 버퍼를 만들어 쌓아 두었다가 처리하는 방식이라면, onBackpressureDrop() 함수는 버퍼가 가득 찼을 때 이후 데이터를 그냥 무시한다.



-

onBackpressureLatest() 는 buffer 와 drop 을 섞은 것으로 마지막 값을 발행할 수 있도록 해준다.



-

돼왕back pressure 에 대해서는 아래 글을 한번 읽어보길 추천한다.

https://www.baeldung.com/rxjava-backpressure




8.5. 마치며




댓글을 달아 주세요


Posted by 돼지왕왕돼지