본문 바로가기
프로그래밍 놀이터/안드로이드, Java

[RxJava] #2 Observable 처음 만들기

by 돼지왕 왕돼지 2019. 6. 3.
반응형


asyncsubject, back pressure, BehaviorSubject, cold observable, cold observable to hot observable, connect, connectableobservable, CREATE, create memory leak, defer, Disposable, dispose, elementAt, exjava frompublisher, firstelement, flatmapmaybe, flow api, flowable, From, fromarray, fromcallable, fromfuture, fromiterable, fromobservable, frompublisher, hot observable, hot observable example, hot observable 예, interval, isdisposed, just, just 함수 갯수, lastelement, maybe, observable, observable factory, observable 발행 속도, observable 팩토리 함수, oncomplete, onerror, onnext, onsuccess, Publish, PublishSubject, RANGE, Reduce, replysubject, rxjava, rxjava asyncsubject, rxjava back pressure, rxjava behaviorsubject, rxjava connectableobservable, rxjava create, rxjava disposable, rxjava flowable, rxjava fromarray, rxjava fromcallable, rxjava fromfuture, rxjava fromiterable, rxjava frommap, rxjava fromobservable, rxjava just, rxjava map from, rxjava maybe, rxjava observable, rxjava publishsubject, rxjava replaysubject, rxjava single, rxjava subject, rxjava 배압, Single, single illegalargumentexception, single observable conversion, single observable converting, singleelement, subscribe, subscribe overloading, subscribe 속도, timer, [RxJava] #2 Observable 처음 만들기, 뜨거운 observable, 배압, 서버 api 호출


-

Observable 은 데이터 흐름에 맞게 알림을 보내 구독자가 데이터를 처리할 수 있도록 한다.

RxJava 프로그래밍은 Observable 에서 시작해 그것으로 끝난대고 해도 과언이 아닐 정도로 중요한 개념이다.



-

Rajava 1.x 에서는 데이터 소스를 Observable 과 Single 클래스로 구성했다.

RxJava 2 에서는 Observable 클래스를 상황에 맞게 세분화해 각각 Observable, Maybe, Flowable 로 나뉘었고, Single 도 그대로 존재한다.



-

Maybe 클래스는 Reduce() 함수나 firstElement() 함수와 같이 데이터가 발행될 수 있거나 혹은 발행되지 않고도 완료되는 경우를 의미한다.



-

Flowable 클래스는 Observable 에서 데이터가 발행되는 속도가 구독자가 처리하는 속도보다 현저하게 빠른 경우 발생하는 배압(Back pressure)이슈에 대응하는 기능을 추가로 제공한다.





2.1. Observable 클래스


-

Observable 은 옵저버 패턴을 구현한다.

옵저버 패턴은 객체의 상태 변화를 관찰하는 관찰자(옵저버) 목록을 객체에 등록한다.

그리고 상태 변화가 있을 때마다 메서드를 호출하여 객체가 직접 목록의 각 옵저버에게 변화를 알려준다.

라이프 사이클은 존재하지 않고 보통 단일 함수를 통해 변화만 알린다.



-

RxJava 의 Observable 은 세 가지 알림을 구독자에게 전달한다.


onNext: 데이터의 발행을 알린다. 기존의 옵저버 패턴과 같다.

onComplete: 모든 데이터의 발행을 완료했음을 알린다. 단 한번만 발생하며, 발생한 후에는 더 이상 onNext 이벤트가 발생해서는 안 된다.

onError : 어떤 이유로 에러가 발생했음을 알린다. 이후에 onNext 및 onComplete 이벤트가 발생하지 않는다. 즉, Observable 의 실행을 종료한다.



-

Observable 클래스에는 Observable 을 생성하는 팩토리 함수, 중간 결과를 처리하는 함수, 디버그 및 예외 처리 함수가 모두 포함되어 있다.



-

1.x 부터 있던 팩토리 메소드 : create(), just(), from()

2.x 추가 팩토리 메소드 : fromArray(), fromIterable(), fromCallable(), fromFuture(), fromPublisher()

기타 팩토리 메소드 : interval(), range(), timer(), defer()




2.1.1. just() 함수


-

just() 함수는 인자로 넣은 데이터를 차례로 발행하는 Observable 을 생성한다.

실제 데이터의 발행은 subscribe() 함수를 호출해야 시작한다.

한 개의 값을 넣을 수도 있고 인자로 여러개의 값(최대 10개)을 넣을 수도 있다.

단, 타입은 모두 같아야 한다.




2.1.2. subscribe() 함수와 Disposable 객체


-

RxJava 는 선언형 프로그래밍을 지향한다.

선언형 프로그래밍은 명령형 프로그래밍(Imperative) 의 반대말로 어떤 방법(how)으로 동작하는지가 아니라 프로그래밍할 대상이 무엇(what)인지 알려주는 것을 의미한다.

예를 들어 명령형에서는 실행할 알고리즘과 동작을 구체적으로 명시하지만, 선언형은 목표를 명시할 뿐 실행할 알고리즘을 명시하지 않는다.



-

Disposable subscribe()

    onNext() 와 onComplete() 이벤트를 무시하고 onError 이벤트가 발생했을 때만 OnErrorNotImplementedException 을 던진다. 따라서 Observable 로 작성한 코드를 테스트하거나 디버깅할 떄 활용한다.


Disposable subscribe(Consumer<? super T> onNext)

    onNext 이벤트를 처리한다. onError 이벤트가 발생하면 OnErrorNotImplementedException 을 던진다.


Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)

    onNext 와 onError 이벤트를 처리한다.


Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete)

    onNext, onError, onComplete 모두 처리할 수 있다.



-

Disposable 은 RxJava 1.x 의 Subscription(구독) 객체에 해당한다.

dispose(), isDisposed() 두개의 함수만 지원한다.


dispose() 는 구독을 해지하는 함수이다.

Observable 계약에 따르면 Observable이 onComplete 알림을 보냈을 때 자동으로 dispose() 를 호출해 Observable 과 구독자의 관계를 끊는다.






2.1.3. create() 함수


-

just() 함수는 데이터를 인자로 넣으면 자동으로 알림 이벤트가 발생하지만, create() 함수는 onNext, onComplete, onError 같은 알림을 개발자가 직접 호출해야 한다.


데이터를 발행할 때 onNext() 를 호출해주어야 하며, 해야 한다.



-

Observable<T> create(ObservableOnSubscribe<T> source)

public interface ObservableOnSubscribe<T>{
    void subscribe(ObservableEmitter<T> e) throws Exception;
}

ex)
Observable<Integer> source = Observable.create( (ObservableEmitter<Integer> emitter) -> {
    emitter.onNext(100);
    emitter.onNext(200);
    emitter.onNext(300);
    emitter.onComplete();
});
source.subscribe(System.out::println);

-

create 로 만드는 Observable 은 cold observable 이다.

이는 observable 의 생성만으로 데이터를 발행하지 않고, subscribe() 를 호출했을 때 값을 발행한다는 이야기다.



-

Observable.create() 를 사용할 때는 주의해야 한다. 

1. Observable 이 구독 해지(dispose)되었을 때 등록된 콜백을 모두 해제해야 한다. 그렇지 않으면 memory leak 이 발생한다.

2. 구독하는 동안에만 onNext 와 onComplete 이벤트를 호출해야 한다.

3. 에러가 발생했을 때는 오직 onError 이벤트로만 에러를 전달해야 한다.

4. 배입(back pressure)를 직접 처리해야 한다.




2.1.4. fromArray() 함수


-

단일 데이터가 아닐 때는 fromXXX() 계열 함수를 사용한다.

RxJava 1.x 에서는 from() 과 fromCallable() 함수만 사용했다.

그런데 from() 함수를 배열, 반복자, 비동기 계산 등에 모두 사용하다 보니 모호함이 있었다.

따라서 RxJava 2 에서 from 함수를 세분화했다.



-

Integer[] arr = { 100, 200, 300 };
Observable<Integer> source = Observable.fromArray(arr);
source.subscribe(System.out::println);

Observable 이 generic 을 사용하기 떄문에 int[] 가 아닌 Integer[] 를 사용한다.




2.1.5. fromIterable() 함수


-

List<String> names = new ArrayList<>();
name.add(“Gamza”);
name.add(“Goguma”);

Observable<String> source = Observable.fromIterable(names);
source.subscribe(System.out::println);


-

Map 객체에 관한 Observable 클래스의 from() 함수는 없다.




2.1.6. fromCallable() 함수


-

Callable<String callable = () -> {
    Thread.sleep(1000);
    return “Hello Callable”;
};
Observable.fromCallable(callbale).subscribe(System.out::println);


2.1.9. fromFuture() 함수


-

Future<String> future = Executors.newSingleThreadExecutor().submit( () -> {
    Thread.sleep(1000);
    return “Hello Future”;
});
Observable.fromFuture(future).subscribe(System.out::println);



2.1.8. fromPublisher() 함수


-

Publisher 는 자바 9의 표준인 Flow API 의 일부이다.

Publisher<String> publisher = (Subscriber<? super String> s) -> {
    s.onNext(“Hello Observable.fromPublisher());
    s.onComplete();
};
Observable.fromPublisher(publisher).subscribe(system.out::println);



2.2. Single 클래스


-

Observable 클래스는 데이터를 무한하게 발행할 수 있지만, Single 클래스는 오직 1개의 데이터만 발행하도록 한정한다.

보통 결과가 유일한 서버 API 를 호출할 때 유용하게 사용할 수 있다.


중요한 것은 데이터 하나가 발행과 동시에 종료(onSuccess)된다는 것이다.

라이프 사이클 관점에서 onNext() 와 onComplete() 함수가 onSuccess() 함수로 통합된 것이다.

Single 클래스의 라이프 사이클 함수는 onSuccess(T value) 함수와 onError() 함수로 구성된다.




2.2.1. just() 함수


-

Single.just(“Hello Single”).subscribe(System.out::println);





2.2.2. Observable 에서 Single 클래스 사용


-

Single 은 Observable 의 특수한 형태이므로 Observable 에서 변환할 수 있다.


-

Observable<String> source = Observable.just(“Hello Single”);
Single.fromObservable(source).subscribe(System.out::println);

source.single(“default item”).subscribe(System.out::println);
// Observable 에서 값이 발행 안 되면 “default item” 이 발행된다.

String[] colors = {“Red”, “Blue”, “Green”};
Observable.fromArray(colors).first(“default value”).subscribe(System.out::println);

Observable.just(new Order(“ORD-1”), new Order(“ORD-2”))
    .take(1)
    .single(new Order(“default order”))
    .subscribe(System.out::println);



2.2.3. Single 클래스의 올바른 사용 방법


-

Observable.just(“Hello Single”, “Error”).single(“default item”).subscribe(System.out::println);

위의 코드를 실행하면 IllegalArgumentException: Sequence contains more than one element! 에러가 발생한다.


에러 메시지는 두 번째 값을 발행하면서 onNext 이벤트가 발생할 때 에러가 발생했다고 알려준다.





2.3. Maybe 클래스


-

Maybe 는 RxJava 2에 도입된 Observable 의 또 다른 특수 형태이다.

Single 클래스와 마찬가지로 최대 데이터 하나를 가질 수 있지만, 데이터 발행 없이 바로 데이터 발생을 완료할 수도 있다.

Maybe 클래스는 Single 클래스에 onComplete 이벤트가 추가된 형태이다.



-

Maybe 객체는 Maybe 클래스를 이용해 생성할 수 있지만 보통 Observable 의 특정 연산자를 통해 생성할 때가 많다.


Maybe 객체를 생성할 수 있는 리액티브 연산자에는 elementAt(), firstElement(), flatMapMaybe(), lastElement(), reduce(), singleElement() 함수 등이 있다.





2.4. 뜨거운 Observable


-

Observable 에는 Hot Observable 과 Cold Observable 이 있다.



-

Cold Observable 은 옵저버가 subscribe 함수를 호출하여 구독하지 않으면 데이터를 발행하지 않는다.

다른 말로 게으른(lazy) 접근법이다.



-

Hot Observable 은 구독자의 존재 여부와 관계없이 데이터를 발행하는 Observable 이다.

따라서 여러 구독자를 고려할 수 있다.

단, 구독자로서는 Observable 에서 발행하는 데이터를 처음부터 모두 수신할 것으로 보장할 수 없다.



-

Cold Observable 은 구독자가 구독하면 준비된 데이터를 처음부터 발행한다.

하지만 Hot Observable 은 구독한 시점에서 Observable 에서 발행한 값을 받는다.



-

Cold Observable 은 웹 요청, 데이터베이스 쿼리와 파일 읽기 등에 쓰인다.

보통 내가 원하는 URL 이나 데이터를 지정하면 그때부터 서버나 데이터베이스 서버에 요청을 보내고 결과를 받아온다.



-

Hot Observable 은 마우스 이벤트, 키보드 이벤트, 시스템 이벤트, 센서 데이터와 주식 가격 등이 있다.



-

Hot Observable 에는 주의할 점이 있다.

배압(Back pressure)을 고려해야 한다.

배압은 Observable 에서 데이터를 발행하는 속도와 구독자가 처리하는 속도의 차이가 클 때 발생한다.


RxJava 1.x 에서는 Observable 클래스에 별도의 배압 연산자들을 제공했지만, RxJava2 에서는 Flowable 이라는 특화 클래스에서 배압을 처리한다.





2.5. Subject 클래스


-

Subject 클래스는 Cold Observable 을 Hot Observable 로 바꿔준다.

Subject 클래스는 Observable 속성과 Subscriber 속성이 모두 있다.

Observable 처럼 데이터를 발행할 수도 있고, Subscriber 처럼 발행된 데이터를 바로 처리할 수도 있다.



-

AsyncSubject, BehaviorSubject, PublishSubject, ReplaySubject 등이 있다.




2.5.1. AsyncSubject 클래스


-

AsyncSubject 클래스는 Observable 에서 발행한 마지막 데이터를 얻어올 수 있는 Subject 클래스이다.

완료되기 전 마지막 데이터에만 관심이 있으며 이전 데이터는 무시한다.

구독자에게 데이터를 전달하지 않다가 완료됨과 동시에 구독자들에게 데이터를 발행하고 종료한다.



-

AsyncSubject<String> subject = AsyncSubject.create();
subject.subscribe( data -> System.out.println(“Subscriber #1 => “ + data));
subject.onNext(“1");
subject.onNext(“3”);
subject.subscribe( data -> System.out.println(“Subscriber #2 => “ + data));
subject.onNext(“5”);
subject.onComplete();


data 는 5만 전달된다.



-

Float[] temperature = {10.1f, 13.4f, 12.5f};
Observable<Float> source = Observable.fromArray(temperature);

AsyncSubject<Float> subject = AsyncSubject.create();
subject.subscribe( data -> System.out.println(“Subscriber #1 => “ + data) );
source.subscribe(subject);


12.5 만 찍힌다.






2.5.2. BehaviorSubject 클래스


-

BehaviorSubject 는 구독자가 구독을 하면 가장 최근 값 혹은 기본값을 넘겨주는 클래스이다.



-

BehaviorSubject<String> subject = BehaviorSubject.createDefault("6”);
subject.subscribe(data -> System.out.println(“Subs #1 => “ + data));
subject.onNext(“1”);
subject.onNext(“3”);
subject.subscribe(data -> System.out.println(“Subs #2 => “ + data));
subject.onNext(“5”);
subject.onComplete();

결과는

Subs #1 => 6

Subs #1 => 1

Subs #1 => 3

Subs #2 => 3

Subs #1 => 5

Subs #2 => 5




2.5.3. PublishSubject 클래스


-

가장 평범한 Subject 클래스이다.

구독자가 subscribe() 함수를 호출하면 값을 발행하기 시작한다.

해당 시간에 발생한 데이터를 그대로 구독자에게 전달한다.




2.5.4. ReplaySubject 클래스


-

가장 특이하고 사용할 때 주의해야 하는 녀석이다.

Subject 클래스의 목적은 Hot Observable 을 활용하는 것인데 Cold Observable 처럼 동작하기 떄문이다.



-

ReplaySubject 클래스는 구독자가 새로 생기면 항상 데이터의 처음부터 끝까지 발행하는 것을 보장해준다.

그러므로 모든 데이터 내용을 저장해두는 과정 중 메모리 누수가 발생할 가능성을 염두에 두고 사용할 때 주의해야 한다.



-

ReplaySubject<String> subject = ReplaySubject.create();
subject.subscribe(data -> System.out.println(“Subs #1 => “ + data));
subject.onNext(“1”);
subject.onNext(“3”);
subject.subscribe(data -> System.out.println(“Subs #2 => “ + data));
subject.onNext(“5”);
subject.onComplete();

결과는..

Subs #1 => 1

Subs #1 => 3

Subs #2 => 1

Subs #2 => 3

Subs #1 => 5

Subs #2 => 5





2.6. ConnectableObservable 클래스


-

ConnectableObservable 클래스는 Subject 클래스처럼 Cold Observable 을 Hot Observable 로 변환한다.

여러 구독자에게 데이터를 동시에 전달할 때 사용한다.

특이한 점은 subscribe() 함수를 호출해도 아무 동작이 일어나지 않는다.

새로 추가된 connect() 함수는 호출한 시점부터 subscribe() 함수를 호출한 구독자에게 데이터를 발행하기 때문이다.



-

ConnectableObservable 객체를 생성하려면 먼저 Observable 에 publish() 함수를 호출해야 한다.

이 함수는 여러 구독자에게 데이터를 발행하기 위해 connect() 함수를 호출하기 전까지 데이터 발행을 유예하는 역할을 한다.


Hot observable 답게 connect() 가 불렸어도, 새로운 subscriber 에게는 새로운 data 만 전달한다.



-

String[] dt = { “1”, “3”, “5” };
Observable<String> balls = Observable.interval(100L, TimeUnit.MILLISECONDS)
    .map(Long::intValue)
    .map(i -> dt[i])
    .take(dt.length);
ConnectableObservable<String> source = balls.publish();
source.subscribe(data -> System.out.println(“Subs #1 => “ + data));
source.subscribe(data -> System.out.println(“Subs #2 => “ + data));
source.connect(); // 이때부터 data 생성이 시작된다.

CommonUtils.sleep(250);
source.subscribe(data -> System.out.println(“Subs #3 => “ + data));
CommonUtils.sleep(100);

결과는..

Subs #1 => 1

Subs #2 => 1

Subs #1 => 3

Subs #2 => 3

Subs #1 => 5

Subs #2 => 5

Subs #3 => 5



-

interval() 함수는 테스트 코드를 작성할 때 많이 활용된다.





2.7. 마치며






반응형

댓글