본문 바로가기
프로그래밍 놀이터/안드로이드, Java

[RxJava] #4 리액티브 연산자의 활용

by 돼지왕 왕돼지 2019. 6. 5.
반응형


Average, Buffer, combinelatest excel, combinelatest 의 이해, concatMap, concatmap vs flatmap, connect, connectableobservable, Count, debounce, flatmap interleaving, flatmap 인터리빙, groupedobservable, interleaving, Max, maybe, Min, observable, Publish, rxandroid, rxapachehttp, RxComputationalThreadPool, rxjava all, rxjava amb, rxjava combinelatest, rxjava concat, rxjava concatmap, rxjava defer, rxjava delay, rxjava doonnext, rxjava groupby, rxjava GroupedObservable, rxjava interleaving, rxjava interval, rxjava intervalrange, rxjava merge, rxjava range, rxjava repeat, rxjava scan, rxjava scan vs reduce, rxjava skipwhile, rxjava switchmap, rxjava takeuntil, rxjava timeinterval, rxjava timer, rxjava zip, rxjava 인터리빙, rxjava 확장 모듈, rxjava 흐름 제어, RxJava2Extensions, rxjavamath, rxjavamath rxjava2, rxnetty, sample, Single, startWith, Sum, takeunti with timer, why reduce maybe, window, [RxJava] #4 리액티브 연산자의 활용, 결합 연산자, 기타 연산자, 변환 연산자, 생성 연산자, 수학 연산자, 수학 함수, 순서 처리 보장, 인터리빙, 조건 연산자


4.1. 생성 연산자


-

생성 연산자의 역할은 데이터 흐름을 만드는 것이다.

간단하게 Observable, Single, Maybe 객체 등을 만든다고 생각하면 된다.




4.1.1. interval() 함수


-

일정 시간 간격으로 데이터 흐름을 생성한다.

주어진 시간 간격으로 0부터 1씩 증가하는 Long 객체를 반환한다.



-

interval() 함수는 기본적으로 영원히 지속 실행되기 때문에 폴링 용도로 많이 사용된다.

수행은 RxComputationalThreadPool 에서 수행된다.

interval 값이 초기 지연값으로도 활용된다.




4.1.2. timer() 함수


-

interval 함수와 유사하지만 한 번만 실행하는 함수.

일정 시간이 지난 후에 한 개의 데이터를 발행하고 onComplete() 이벤트가 발생한다.

발행하는 값은 0L 이다.




4.1.3. range() 함수


-

주어진 값 n 부터 m 개의 Integer 객체를 발행한다.

반복문(for, while)을 대체할 수 있다.




4.1.4. intervalRange() 함수


-

interval 처럼 일정한 시간 간격으로 값을 출력하지만 range 처럼 시작 숫자(n) 로부터 m개만큼의 값을 생성하고 onComplete 이벤트가 발생한다.

즉, interval 처럼 무한히 데이터 흐름을 발행하지 않는다.




4.1.5. defer() 함수


-

timer 함수와 비슷하지만 데이터 흐름 생성을 구독자가 subscribe() 함수를 호출할 때까지 미룰 수 있다.


-

@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier)


-
돼왕 : 위의 글만 읽고서는 정확한 이해가 안 된다.
이해 안 되는 이유는.. 대부분이 cold observable 인데 defer 의 특별한 점을 모르겠다는 점 때문일 것이다.
요지는 defer 를 사용하면 subscribe 가 되는 순간 새로운 observable 을 제공하며 create 보다 더 nicer 한 간편한 코드를 만들 수 있다는 것이다.




4.1.6. repeat() 함수


-

String[] balls = { “1”, “3”, “5” };
Observable.fromArrays(balls).repeat(3).doOnComplete( () -> Log.d(“onComplete”) ).subscribe(Log::i);

결과는.. 1, 3, 5, 를 3번 반복해서 print 하고 그 다음 onComplete 를 print



-

repeat 함수는 동작이 한 번 끝난 다음에 다시 구독하는 방식으로 동작한다

그리고 다시 구독할 때마다 동작하는 스레드의 번호가 달라질 수 있다.





4.2. 변환 연산자


4.2.1. concatMap() 함수


-

flatMap 은 먼저 들어온 데이터를 처리하는 도중에 새로운 데이터가 들어오면 나중에 들어온 데이터의 처리 결과가 먼저 출력될 수도 있다.

이를 인터리빙(interleaving, 끼어들기)라고 한다.

하지만 concatMap 은 먼저 들어온 데이터 순서대로 처리해서 결과를 낼 수 있도록 보장해준다.


Average, Buffer, combinelatest excel, combinelatest 의 이해, concatMap, concatmap vs flatmap, connect, connectableobservable, Count, debounce, flatmap interleaving, flatmap 인터리빙, groupedobservable, interleaving, Max, maybe, Min, observable, Publish, rxandroid, rxapachehttp, RxComputationalThreadPool, rxjava all, rxjava amb, rxjava combinelatest, rxjava concat, rxjava concatmap, rxjava defer, rxjava delay, rxjava doonnext, rxjava groupby, rxjava GroupedObservable, rxjava interleaving, rxjava interval, rxjava intervalrange, rxjava merge, rxjava range, rxjava repeat, rxjava scan, rxjava scan vs reduce, rxjava skipwhile, rxjava switchmap, rxjava takeuntil, rxjava timeinterval, rxjava timer, rxjava zip, rxjava 인터리빙, rxjava 확장 모듈, rxjava 흐름 제어, RxJava2Extensions, rxjavamath, rxjavamath rxjava2, rxnetty, sample, Single, startWith, Sum, takeunti with timer, why reduce maybe, window, [RxJava] #4 리액티브 연산자의 활용, 결합 연산자, 기타 연산자, 변환 연산자, 생성 연산자, 수학 연산자, 수학 함수, 순서 처리 보장, 인터리빙, 조건 연산자


Average, Buffer, combinelatest excel, combinelatest 의 이해, concatMap, concatmap vs flatmap, connect, connectableobservable, Count, debounce, flatmap interleaving, flatmap 인터리빙, groupedobservable, interleaving, Max, maybe, Min, observable, Publish, rxandroid, rxapachehttp, RxComputationalThreadPool, rxjava all, rxjava amb, rxjava combinelatest, rxjava concat, rxjava concatmap, rxjava defer, rxjava delay, rxjava doonnext, rxjava groupby, rxjava GroupedObservable, rxjava interleaving, rxjava interval, rxjava intervalrange, rxjava merge, rxjava range, rxjava repeat, rxjava scan, rxjava scan vs reduce, rxjava skipwhile, rxjava switchmap, rxjava takeuntil, rxjava timeinterval, rxjava timer, rxjava zip, rxjava 인터리빙, rxjava 확장 모듈, rxjava 흐름 제어, RxJava2Extensions, rxjavamath, rxjavamath rxjava2, rxnetty, sample, Single, startWith, Sum, takeunti with timer, why reduce maybe, window, [RxJava] #4 리액티브 연산자의 활용, 결합 연산자, 기타 연산자, 변환 연산자, 생성 연산자, 수학 연산자, 수학 함수, 순서 처리 보장, 인터리빙, 조건 연산자


-

concatMap() 함수를 flatMap() 함수로 변경하면 interleaving(인터리빙)을 허용하기 때문에 순서 보장은 안 되지만 훨씬 빠르다.




4.2.2. switchMap() 함수


-

concatMap() 함수가 인터리빙이 발생할 수 있는 상황에서 동작의 순서를 보장해준다면, switchMap() 함수는 순서를 보장하기 위해 기존에 진행 중이던 작업을 바로 중단한다.

그리고 여러 개의 값이 발행 되었을 때 마지막에 들어온 값만 처리하고 싶을 때 사용한다.

중간에 끊기더라도 마지막 데이터의 처리는 보장한다.


Average, Buffer, combinelatest excel, combinelatest 의 이해, concatMap, concatmap vs flatmap, connect, connectableobservable, Count, debounce, flatmap interleaving, flatmap 인터리빙, groupedobservable, interleaving, Max, maybe, Min, observable, Publish, rxandroid, rxapachehttp, RxComputationalThreadPool, rxjava all, rxjava amb, rxjava combinelatest, rxjava concat, rxjava concatmap, rxjava defer, rxjava delay, rxjava doonnext, rxjava groupby, rxjava GroupedObservable, rxjava interleaving, rxjava interval, rxjava intervalrange, rxjava merge, rxjava range, rxjava repeat, rxjava scan, rxjava scan vs reduce, rxjava skipwhile, rxjava switchmap, rxjava takeuntil, rxjava timeinterval, rxjava timer, rxjava zip, rxjava 인터리빙, rxjava 확장 모듈, rxjava 흐름 제어, RxJava2Extensions, rxjavamath, rxjavamath rxjava2, rxnetty, sample, Single, startWith, Sum, takeunti with timer, why reduce maybe, window, [RxJava] #4 리액티브 연산자의 활용, 결합 연산자, 기타 연산자, 변환 연산자, 생성 연산자, 수학 연산자, 수학 함수, 순서 처리 보장, 인터리빙, 조건 연산자



-

CommonUtils.exampleStart();

String[] balls = {“1”, “3”, “5”};
Observable<String> source = Observable.interval(100L, TimeUnit.MILLISECONDS)
    .map(Long::intValue)
    .map(idx -> balls[idx])
    .take(balls.length)
    .switchMap(ball -> Observable.interval(200L, TimeUnit.MILLISECONDS)
            .map(notUsed -> ball + “<>”)
            .take(2)
    );
source.subscribe(Log::it);
CommonUtils.sleep(2000);

결과는..

RxComputationThreadPool-4 | 685 | value = 5<>

RxComputationThreadPool-4 | 884 | value = 5<>



-

doOnNext() 를 넣어 중간 결과 알아보기


switchMap 전에 doOnNext(Log::dt) 를 넣어주어 중간결과값을 확인한다.






4.2.3. groupBy() 함수


-

어떤 기준(keySelector 인자)으로 단일 Observable 을 여러 개로 이루어진 Observable 그룹(GroupedObservable)으로 만든다.



-

String[] objs = {“6”, “4”, “2-T”, “2”, “6-T”, “4-T”};
Observable<GroupedObservable<String, String>> source = Observable.fromArray(objs).groupBy(CommonUtils::getShape);
source.subscribe(obj -> { // obj 는 GroupedObservable 이며 group 조건이 된 key 를 가지고 있다.
    obj.subscribe( val -> // GroupedObservable 을 subscribe 하면 value 가 전달된다.
        System.out.println(“GROUP:” + obj.getKey() + “\t Value:” + val)
    );
});

CommonUtils.getShape 함수는 아래와 같은 형태이다.
fun CommUtils.getShape(obj:String):String{
    if(obj.endsWith("-T")){
        return "TRIANGLE"
    }else{
        return "BALL"
    }
}

결과는..

GROUP:BALL    VALUE:6

GROUP:BALL    VALUE:4

GROUP:TRIANGLE    VALUE:2-T

GROUP:BALL    VALUE:2

GROUP:TRIANGLE    VALUE:6-T

GROUP:TRIANGLE    VALUE:4-T


groupping 은 되었지만, 순서는 objs 의 전달 순서를 보장함을 눈여겨봐야한다.



-

GroupedObservable 은 Observable 과 동일하지만 getKey() 라는 메서드를 제공하여 구분된 그룹을 알 수 있게 해준다.




4.2.4. scan() 함수


-

scan 함수는 실행할 때마다 입력값에 맞는 중간 결과 및 최종 결과를 구독자에게 발행한다.



-

reduce 함수의 경우 마지막 값이 입력되지 않거나 onComplete 이벤트가 발생하지 않으면 구독자에게 값을 발행하지 않는다.

최악의 경우에는 값을 전혀 발행하지 않고 종료할 수도 있으므로 Maybe 클래스 타입으로 정의.

반면 scan 함수는 값이 입력될 때마다 구독자에게 값을 발행한다. 따라서 Maybe 가 아닌 Observable 이다.


Average, Buffer, combinelatest excel, combinelatest 의 이해, concatMap, concatmap vs flatmap, connect, connectableobservable, Count, debounce, flatmap interleaving, flatmap 인터리빙, groupedobservable, interleaving, Max, maybe, Min, observable, Publish, rxandroid, rxapachehttp, RxComputationalThreadPool, rxjava all, rxjava amb, rxjava combinelatest, rxjava concat, rxjava concatmap, rxjava defer, rxjava delay, rxjava doonnext, rxjava groupby, rxjava GroupedObservable, rxjava interleaving, rxjava interval, rxjava intervalrange, rxjava merge, rxjava range, rxjava repeat, rxjava scan, rxjava scan vs reduce, rxjava skipwhile, rxjava switchmap, rxjava takeuntil, rxjava timeinterval, rxjava timer, rxjava zip, rxjava 인터리빙, rxjava 확장 모듈, rxjava 흐름 제어, RxJava2Extensions, rxjavamath, rxjavamath rxjava2, rxnetty, sample, Single, startWith, Sum, takeunti with timer, why reduce maybe, window, [RxJava] #4 리액티브 연산자의 활용, 결합 연산자, 기타 연산자, 변환 연산자, 생성 연산자, 수학 연산자, 수학 함수, 순서 처리 보장, 인터리빙, 조건 연산자


Average, Buffer, combinelatest excel, combinelatest 의 이해, concatMap, concatmap vs flatmap, connect, connectableobservable, Count, debounce, flatmap interleaving, flatmap 인터리빙, groupedobservable, interleaving, Max, maybe, Min, observable, Publish, rxandroid, rxapachehttp, RxComputationalThreadPool, rxjava all, rxjava amb, rxjava combinelatest, rxjava concat, rxjava concatmap, rxjava defer, rxjava delay, rxjava doonnext, rxjava groupby, rxjava GroupedObservable, rxjava interleaving, rxjava interval, rxjava intervalrange, rxjava merge, rxjava range, rxjava repeat, rxjava scan, rxjava scan vs reduce, rxjava skipwhile, rxjava switchmap, rxjava takeuntil, rxjava timeinterval, rxjava timer, rxjava zip, rxjava 인터리빙, rxjava 확장 모듈, rxjava 흐름 제어, RxJava2Extensions, rxjavamath, rxjavamath rxjava2, rxnetty, sample, Single, startWith, Sum, takeunti with timer, why reduce maybe, window, [RxJava] #4 리액티브 연산자의 활용, 결합 연산자, 기타 연산자, 변환 연산자, 생성 연산자, 수학 연산자, 수학 함수, 순서 처리 보장, 인터리빙, 조건 연산자





4.3. 결합 연산자


4.3.1. zip() 함수


-

각각의 Observable 을 모두 활용해 2개 혹은 그 이상의 Observable 을 결합한다.

A, B 두 개의 Observable 을 결합한다면 2개의 Observable 에서 모두 데이터를 발행해야 결합할 수 있다.

그전까지는 발행을 기다린다.


Average, Buffer, combinelatest excel, combinelatest 의 이해, concatMap, concatmap vs flatmap, connect, connectableobservable, Count, debounce, flatmap interleaving, flatmap 인터리빙, groupedobservable, interleaving, Max, maybe, Min, observable, Publish, rxandroid, rxapachehttp, RxComputationalThreadPool, rxjava all, rxjava amb, rxjava combinelatest, rxjava concat, rxjava concatmap, rxjava defer, rxjava delay, rxjava doonnext, rxjava groupby, rxjava GroupedObservable, rxjava interleaving, rxjava interval, rxjava intervalrange, rxjava merge, rxjava range, rxjava repeat, rxjava scan, rxjava scan vs reduce, rxjava skipwhile, rxjava switchmap, rxjava takeuntil, rxjava timeinterval, rxjava timer, rxjava zip, rxjava 인터리빙, rxjava 확장 모듈, rxjava 흐름 제어, RxJava2Extensions, rxjavamath, rxjavamath rxjava2, rxnetty, sample, Single, startWith, Sum, takeunti with timer, why reduce maybe, window, [RxJava] #4 리액티브 연산자의 활용, 결합 연산자, 기타 연산자, 변환 연산자, 생성 연산자, 수학 연산자, 수학 함수, 순서 처리 보장, 인터리빙, 조건 연산자



-

String[] shapes = {“BALL”, “PENTAGON”, “STAR”}; String[] coloredTriangles = {“2-T”, “6-T”, “4-T”}; Observable.zip( Observable.fromArray(shapes).map(Shape::getSuffix), Observable.fromArray(coloredTriangles).map(Shape::getColor), (suffix, color) -> color + suffix ) .subscribe(Log::i)



-

Observable<Integer> source = Observable.zip(
    Observable.just(100, 200, 300),
    Observable.just(10, 20, 30),
    (a, b) -> a + b)
    .zipWith(Observable.just(1,2,3), (ab, c) -> ab + c);
source.subscribe(Log::i)

zip 으로 3개를 결부해도 되는 것을, zip 으로 2개를 결부하고, zipWith 로 한번 더 결부하는 형태이다.




4.3.2. combineLatest() 함수


-

2개 이상의 Observable 을 기반으로 Observable 각각의 값이 변경되었을 때 갱신해주는 함수이다.

첫번째 Observable 에서만 데이터를 발행하거나 두 번째 Observable 의 데이터 흐름만 있으면 구독자에게 어떤 데이터도 발행하지 않는다.

하지만 두 Observable 모두 값을 발행하면 그때는 결괏값이 나온다.

그 다음부터는 둘 중에 어떤 것이 갱신되던지 최신 결과값을 보여준다.


zip 함수처럼 결합하고자 하는 첫 번째와 두 번째 Observable 을 넣고 마지막으로 그것을 결합하는 combiner 함수를 넣어주면 된다.

입력할 수 있는 Observable 의 개수는 최대 9개


Average, Buffer, combinelatest excel, combinelatest 의 이해, concatMap, concatmap vs flatmap, connect, connectableobservable, Count, debounce, flatmap interleaving, flatmap 인터리빙, groupedobservable, interleaving, Max, maybe, Min, observable, Publish, rxandroid, rxapachehttp, RxComputationalThreadPool, rxjava all, rxjava amb, rxjava combinelatest, rxjava concat, rxjava concatmap, rxjava defer, rxjava delay, rxjava doonnext, rxjava groupby, rxjava GroupedObservable, rxjava interleaving, rxjava interval, rxjava intervalrange, rxjava merge, rxjava range, rxjava repeat, rxjava scan, rxjava scan vs reduce, rxjava skipwhile, rxjava switchmap, rxjava takeuntil, rxjava timeinterval, rxjava timer, rxjava zip, rxjava 인터리빙, rxjava 확장 모듈, rxjava 흐름 제어, RxJava2Extensions, rxjavamath, rxjavamath rxjava2, rxnetty, sample, Single, startWith, Sum, takeunti with timer, why reduce maybe, window, [RxJava] #4 리액티브 연산자의 활용, 결합 연산자, 기타 연산자, 변환 연산자, 생성 연산자, 수학 연산자, 수학 함수, 순서 처리 보장, 인터리빙, 조건 연산자



-

String[] data1 = { “6”, “7”, “4”, “2” };
String[] data2 = {“Diamond”, “Star”, “Pentagon”};

Observable<String> source = Observable.combineLatest(
    Observable.fromArray(data1).zipWith(
        Observable.interval(100L, TimeUnit.MILLISECONDS), (shape, notUsed) -> Shape.getColor(shape)),
    Observable.fromArray(data2).zipWith(
        Observable.interval(150L, 200L, TimeUnit.MILLISECONDS), (shape, notUsed) -> Shape.getSuffix(shape)),
    (v1, v2) -> v1 + v2);
sopurce.subscribe(Log::i);


-

combineLatest() 함수의 대표적인 활용 예는 엑셀의 셀이다.

예를 들어 어떤 셀에 =A+B 라는 수식을 넣었다면 A셀과 B셀의 어떤 값이 변경되든 즉시 새로운 합의 결과를 표시한다.



-

Observable.startWith(0)의 startWith  함수는 초기값을 지정해 줄 수 있다.



-

ConnectableObservable 은 observable.publish 를 통해 생성가능하며, connect() 가 불린 순간부터 값을 생산하기 시작한다.






4.3.3. merge() 함수


-

입력 Observable 의 순서와 모든 Observable 이 데이터를 발행하는지 등에 관여하지 않고 어느 것이든 업스트림에서 먼저 입력되는 데이터를 그대로 발행한다.


Average, Buffer, combinelatest excel, combinelatest 의 이해, concatMap, concatmap vs flatmap, connect, connectableobservable, Count, debounce, flatmap interleaving, flatmap 인터리빙, groupedobservable, interleaving, Max, maybe, Min, observable, Publish, rxandroid, rxapachehttp, RxComputationalThreadPool, rxjava all, rxjava amb, rxjava combinelatest, rxjava concat, rxjava concatmap, rxjava defer, rxjava delay, rxjava doonnext, rxjava groupby, rxjava GroupedObservable, rxjava interleaving, rxjava interval, rxjava intervalrange, rxjava merge, rxjava range, rxjava repeat, rxjava scan, rxjava scan vs reduce, rxjava skipwhile, rxjava switchmap, rxjava takeuntil, rxjava timeinterval, rxjava timer, rxjava zip, rxjava 인터리빙, rxjava 확장 모듈, rxjava 흐름 제어, RxJava2Extensions, rxjavamath, rxjavamath rxjava2, rxnetty, sample, Single, startWith, Sum, takeunti with timer, why reduce maybe, window, [RxJava] #4 리액티브 연산자의 활용, 결합 연산자, 기타 연산자, 변환 연산자, 생성 연산자, 수학 연산자, 수학 함수, 순서 처리 보장, 인터리빙, 조건 연산자



-

String[] data1 = {“1”, “3”};
String[] data2 = {“2”, “4”, “6”};

Observable<String> source1 = Observable.interval(0L, 100L, TimeUnit.MILLISECONDS)
    .map(Long::intValue)
    .map(idx -> data1[idx])
    .take(data1.length)

Observable<String> source2 = Observable.interval(50L, TimeUnit.MILLISECONDS)
    .map(Long::intValue)
    .map(idx -> data2[idx])
    .take(data2.length)

Observable.merge(source1, source2).subscribe(Log::i);

결과는..

RxComputationThreadPool-1 | value = 1

RxComputationThreadPool-2 | value = 2

RxComputationThreadPool-1 | value = 3

RxComputationThreadPool-2 | value = 4

RxComputationThreadPool-2 | value = 6


각각의 thread 에서 처리한다는 것은 흥미로운 정보




4.3.4. concat() 함수


-

2개 이상의 Observable 을 이어 붙여준다.

첫번째 Observable 에서 onComplete 이벤트가 발생해야 두번째 Observable 을 구독한다.


Average, Buffer, combinelatest excel, combinelatest 의 이해, concatMap, concatmap vs flatmap, connect, connectableobservable, Count, debounce, flatmap interleaving, flatmap 인터리빙, groupedobservable, interleaving, Max, maybe, Min, observable, Publish, rxandroid, rxapachehttp, RxComputationalThreadPool, rxjava all, rxjava amb, rxjava combinelatest, rxjava concat, rxjava concatmap, rxjava defer, rxjava delay, rxjava doonnext, rxjava groupby, rxjava GroupedObservable, rxjava interleaving, rxjava interval, rxjava intervalrange, rxjava merge, rxjava range, rxjava repeat, rxjava scan, rxjava scan vs reduce, rxjava skipwhile, rxjava switchmap, rxjava takeuntil, rxjava timeinterval, rxjava timer, rxjava zip, rxjava 인터리빙, rxjava 확장 모듈, rxjava 흐름 제어, RxJava2Extensions, rxjavamath, rxjavamath rxjava2, rxnetty, sample, Single, startWith, Sum, takeunti with timer, why reduce maybe, window, [RxJava] #4 리액티브 연산자의 활용, 결합 연산자, 기타 연산자, 변환 연산자, 생성 연산자, 수학 연산자, 수학 함수, 순서 처리 보장, 인터리빙, 조건 연산자



-

Observable 의 중간 결과를 간편하게 확인할 수 있는 함수들이 있다.

doOnNext(), doOnComplete(), doOnError() 함수들이 그것이다.





4.4. 조건 연산자


-

조건 연산자는 Observable 의 흐름을 제어하는 역할을 한다.




4.4.1. amb() 함수


-

여러 개의 Observable 중에서 1개의 Observable 을 선택하는데, 선택 기준은 가장 먼저 데이터를 발행하는 Observable 이다.

이후에 나머지 Observable 에서 발행하는 데이터는 모두 무시한다.


Average, Buffer, combinelatest excel, combinelatest 의 이해, concatMap, concatmap vs flatmap, connect, connectableobservable, Count, debounce, flatmap interleaving, flatmap 인터리빙, groupedobservable, interleaving, Max, maybe, Min, observable, Publish, rxandroid, rxapachehttp, RxComputationalThreadPool, rxjava all, rxjava amb, rxjava combinelatest, rxjava concat, rxjava concatmap, rxjava defer, rxjava delay, rxjava doonnext, rxjava groupby, rxjava GroupedObservable, rxjava interleaving, rxjava interval, rxjava intervalrange, rxjava merge, rxjava range, rxjava repeat, rxjava scan, rxjava scan vs reduce, rxjava skipwhile, rxjava switchmap, rxjava takeuntil, rxjava timeinterval, rxjava timer, rxjava zip, rxjava 인터리빙, rxjava 확장 모듈, rxjava 흐름 제어, RxJava2Extensions, rxjavamath, rxjavamath rxjava2, rxnetty, sample, Single, startWith, Sum, takeunti with timer, why reduce maybe, window, [RxJava] #4 리액티브 연산자의 활용, 결합 연산자, 기타 연산자, 변환 연산자, 생성 연산자, 수학 연산자, 수학 함수, 순서 처리 보장, 인터리빙, 조건 연산자



-

List<Observable<String>> sources = Arrays.asList(
    Observable.fromArray(data1),
    Observable.fromArray(data2).delay(100L, TimeUnit.MILLISECONDS));
Observable.amb(sources).subscribe(Log::i);



4.4.2. takeUntil() 함수


-

take 함수에 조건을 설정하는 것이다.

인자로 받은 Observable 에서 어떤 값을 발행하면 현재 Observable 의 데이터 발행을 중단하고 즉시 완료(onComplete 이벤트 발생)한다.

take 함수처럼 일정 개수만 값을 발행하되 완료 기준을 다른 Observable 에서 값을 발행하는지로 판단하는 것이다.


Average, Buffer, combinelatest excel, combinelatest 의 이해, concatMap, concatmap vs flatmap, connect, connectableobservable, Count, debounce, flatmap interleaving, flatmap 인터리빙, groupedobservable, interleaving, Max, maybe, Min, observable, Publish, rxandroid, rxapachehttp, RxComputationalThreadPool, rxjava all, rxjava amb, rxjava combinelatest, rxjava concat, rxjava concatmap, rxjava defer, rxjava delay, rxjava doonnext, rxjava groupby, rxjava GroupedObservable, rxjava interleaving, rxjava interval, rxjava intervalrange, rxjava merge, rxjava range, rxjava repeat, rxjava scan, rxjava scan vs reduce, rxjava skipwhile, rxjava switchmap, rxjava takeuntil, rxjava timeinterval, rxjava timer, rxjava zip, rxjava 인터리빙, rxjava 확장 모듈, rxjava 흐름 제어, RxJava2Extensions, rxjavamath, rxjavamath rxjava2, rxnetty, sample, Single, startWith, Sum, takeunti with timer, why reduce maybe, window, [RxJava] #4 리액티브 연산자의 활용, 결합 연산자, 기타 연산자, 변환 연산자, 생성 연산자, 수학 연산자, 수학 함수, 순서 처리 보장, 인터리빙, 조건 연산자



-

String[] data = {“1”, “2”, “3”, “4”, “5”, “6”};
Observable<String> source = Observable.fromArray(data)
    .zipWith(
        Observable.interval(100L, TimeUnit.MILLISECONDS),
        (val, notUsed) -> val))
    .takeUntil(Observable.timer(500L, TimeUnit.MILLISECONDS));
source.subscribe(Log::i);

결과는..

ExComputationThreadPool-2 | value = 1

ExComputationThreadPool-2 | value = 2

ExComputationThreadPool-2 | value = 3

ExComputationThreadPool-2 | value = 4



-

비슷한 함수로 skipWhile() 이 있다.


Average, Buffer, combinelatest excel, combinelatest 의 이해, concatMap, concatmap vs flatmap, connect, connectableobservable, Count, debounce, flatmap interleaving, flatmap 인터리빙, groupedobservable, interleaving, Max, maybe, Min, observable, Publish, rxandroid, rxapachehttp, RxComputationalThreadPool, rxjava all, rxjava amb, rxjava combinelatest, rxjava concat, rxjava concatmap, rxjava defer, rxjava delay, rxjava doonnext, rxjava groupby, rxjava GroupedObservable, rxjava interleaving, rxjava interval, rxjava intervalrange, rxjava merge, rxjava range, rxjava repeat, rxjava scan, rxjava scan vs reduce, rxjava skipwhile, rxjava switchmap, rxjava takeuntil, rxjava timeinterval, rxjava timer, rxjava zip, rxjava 인터리빙, rxjava 확장 모듈, rxjava 흐름 제어, RxJava2Extensions, rxjavamath, rxjavamath rxjava2, rxnetty, sample, Single, startWith, Sum, takeunti with timer, why reduce maybe, window, [RxJava] #4 리액티브 연산자의 활용, 결합 연산자, 기타 연산자, 변환 연산자, 생성 연산자, 수학 연산자, 수학 함수, 순서 처리 보장, 인터리빙, 조건 연산자






4.4.3. skipUntil() 함수


-

skipUntil 은 takeUntil 과 정반대의 함수이다.

other Observable 을 인자로 받는다는 점은 같지만 Observable 에서 데이터를 발행할 때까지 값을 건너뛴다.


Average, Buffer, combinelatest excel, combinelatest 의 이해, concatMap, concatmap vs flatmap, connect, connectableobservable, Count, debounce, flatmap interleaving, flatmap 인터리빙, groupedobservable, interleaving, Max, maybe, Min, observable, Publish, rxandroid, rxapachehttp, RxComputationalThreadPool, rxjava all, rxjava amb, rxjava combinelatest, rxjava concat, rxjava concatmap, rxjava defer, rxjava delay, rxjava doonnext, rxjava groupby, rxjava GroupedObservable, rxjava interleaving, rxjava interval, rxjava intervalrange, rxjava merge, rxjava range, rxjava repeat, rxjava scan, rxjava scan vs reduce, rxjava skipwhile, rxjava switchmap, rxjava takeuntil, rxjava timeinterval, rxjava timer, rxjava zip, rxjava 인터리빙, rxjava 확장 모듈, rxjava 흐름 제어, RxJava2Extensions, rxjavamath, rxjavamath rxjava2, rxnetty, sample, Single, startWith, Sum, takeunti with timer, why reduce maybe, window, [RxJava] #4 리액티브 연산자의 활용, 결합 연산자, 기타 연산자, 변환 연산자, 생성 연산자, 수학 연산자, 수학 함수, 순서 처리 보장, 인터리빙, 조건 연산자




4.4.4. all() 함수


-

주어진 조건에 100% 맞을 때만 true 값을 발행하고 조건에 맞지 않는 데이터가 발행되면 바로 false 값을 발행한다.



-

Observable.fromArray(data)
    .map(Shape::getShape)
    .all(Shape.BALL::equals)
    .subscribe(Log::i);




4.5. 수학 및 기타 연산자


4.5.1. 수학 함수


-

RxJava 에는 여러 가지 확장 모듈이 존재한다.

RxAndroid, RxNetty, RxApacheHttp 등이 있다.


-

RxJava 1.x 에는 수학 함수들을 모은 RxJavaMath 가 있다.

RxJavaMath 는 RxJava 2.x 를 지원하지 않으므로 다른 라이브러리를 사용해야 한다.

RxJava2Extensions 라이브러리를 활용해야 한다.


gradle dependency 로 “com.github.akarnokd:rxjava2-extensions:0.17.6” 을 명시해준다. (Version 은 최신으로!)



-

count, max, min, sum, average 등이 있다.



-

Flowable.fromArray(data)
    .to(MathFlowable::max)
    .subscribe(max -> Log.i(“max is “ + max));


4.5.2. delay() 함수


-

delay 외에도 시간 관련 함수들은 buffer, debounce, sample, window 등이 있다.



-

단순히 인자로 전달받는 time 과 시간 단위만큼 입력받은 Observable 의 데이터 발행을 지연시켜주는 역할을 한다.



-

String[] data = {“1”, “7”, “2”, “3”, “4”};
Observable.fromArray(data)
    .delay(100L, TimeUnit.MILLISECONDS)
    .subscribe(Log::it);

100ms 이후부터 1 값을 생산한다.




4.5.3. timeInterval() 함수


-

어떤 값을 발행했을 때 이전 값을 발행한 이후 얼마나 시간이 흘렀는지를 알려준다.



-

String[] data = {“1”, “3”, “7”};
Observable<Timed<String>> source = Observable.fromArray(data).delay(item -> {
        // do something
        return Observable.just(item);
})
.timeInterval()
source.subscribe(Log::it);




4.6. 마치며






반응형

댓글