프로그래밍 놀이터/안드로이드, Java

[RxJava] #2 Observable 처음 만들기

돼지왕 왕돼지 2019. 6. 3. 15:33
반응형



-

Observable 은 데이터 흐름에 맞게 알림을 보내 구독자가 데이터를 처리할 수 있도록 한다.

RxJava 프로그래밍은 Observable 에서 시작해 그것으로 끝난대고 해도 과언이 아닐 정도로 중요한 개념이다.



-

Rajava 1.x 에서는 데이터 소스를 Observable 과 Single 클래스로 구성했다.

RxJava 2 에서는 Observable 클래스를 상황에 맞게 세분화해 각각 Observable, Maybe, Flowable 로 나뉘었고, Single 도 그대로 존재한다.



-

Maybe 클래스는 Reduce() 함수나 firstElement() 함수와 같이 데이터가 발행될 수 있거나 혹은 발행되지 않고도 완료되는 경우를 의미한다.



-

Flowable 클래스는 Observable 에서 데이터가 발행되는 속도가 구독자가 처리하는 속도보다 현저하게 빠른 경우 발생하는 배압(Back pressure)이슈에 대응하는 기능을 추가로 제공한다.





2.1. Observable 클래스


-

Observable 은 옵저버 패턴을 구현한다.

옵저버 패턴은 객체의 상태 변화를 관찰하는 관찰자(옵저버) 목록을 객체에 등록한다.

그리고 상태 변화가 있을 때마다 메서드를 호출하여 객체가 직접 목록의 각 옵저버에게 변화를 알려준다.

라이프 사이클은 존재하지 않고 보통 단일 함수를 통해 변화만 알린다.



-

RxJava 의 Observable 은 세 가지 알림을 구독자에게 전달한다.


onNext: 데이터의 발행을 알린다. 기존의 옵저버 패턴과 같다.

onComplete: 모든 데이터의 발행을 완료했음을 알린다. 단 한번만 발생하며, 발생한 후에는 더 이상 onNext 이벤트가 발생해서는 안 된다.

onError : 어떤 이유로 에러가 발생했음을 알린다. 이후에 onNext 및 onComplete 이벤트가 발생하지 않는다. 즉, Observable 의 실행을 종료한다.



-

Observable 클래스에는 Observable 을 생성하는 팩토리 함수, 중간 결과를 처리하는 함수, 디버그 및 예외 처리 함수가 모두 포함되어 있다.



-

1.x 부터 있던 팩토리 메소드 : create(), just(), from()

2.x 추가 팩토리 메소드 : fromArray(), fromIterable(), fromCallable(), fromFuture(), fromPublisher()

기타 팩토리 메소드 : interval(), range(), timer(), defer()




2.1.1. just() 함수


-

just() 함수는 인자로 넣은 데이터를 차례로 발행하는 Observable 을 생성한다.

실제 데이터의 발행은 subscribe() 함수를 호출해야 시작한다.

한 개의 값을 넣을 수도 있고 인자로 여러개의 값(최대 10개)을 넣을 수도 있다.

단, 타입은 모두 같아야 한다.




2.1.2. subscribe() 함수와 Disposable 객체


-

RxJava 는 선언형 프로그래밍을 지향한다.

선언형 프로그래밍은 명령형 프로그래밍(Imperative) 의 반대말로 어떤 방법(how)으로 동작하는지가 아니라 프로그래밍할 대상이 무엇(what)인지 알려주는 것을 의미한다.

예를 들어 명령형에서는 실행할 알고리즘과 동작을 구체적으로 명시하지만, 선언형은 목표를 명시할 뿐 실행할 알고리즘을 명시하지 않는다.



-

Disposable subscribe()

    onNext() 와 onComplete() 이벤트를 무시하고 onError 이벤트가 발생했을 때만 OnErrorNotImplementedException 을 던진다. 따라서 Observable 로 작성한 코드를 테스트하거나 디버깅할 떄 활용한다.


Disposable subscribe(Consumer<? super T> onNext)

    onNext 이벤트를 처리한다. onError 이벤트가 발생하면 OnErrorNotImplementedException 을 던진다.


Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)

    onNext 와 onError 이벤트를 처리한다.


Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete)

    onNext, onError, onComplete 모두 처리할 수 있다.



-

Disposable 은 RxJava 1.x 의 Subscription(구독) 객체에 해당한다.

dispose(), isDisposed() 두개의 함수만 지원한다.


dispose() 는 구독을 해지하는 함수이다.

Observable 계약에 따르면 Observable이 onComplete 알림을 보냈을 때 자동으로 dispose() 를 호출해 Observable 과 구독자의 관계를 끊는다.






2.1.3. create() 함수


-

just() 함수는 데이터를 인자로 넣으면 자동으로 알림 이벤트가 발생하지만, create() 함수는 onNext, onComplete, onError 같은 알림을 개발자가 직접 호출해야 한다.


데이터를 발행할 때 onNext() 를 호출해주어야 하며, 해야 한다.



-

Observable<T> create(ObservableOnSubscribe<T> source)

public interface ObservableOnSubscribe<T>{
    void subscribe(ObservableEmitter<T> e) throws Exception;
}

ex)
Observable<Integer> source = Observable.create( (ObservableEmitter<Integer> emitter) -> {
    emitter.onNext(100);
    emitter.onNext(200);
    emitter.onNext(300);
    emitter.onComplete();
});
source.subscribe(System.out::println);

-

create 로 만드는 Observable 은 cold observable 이다.

이는 observable 의 생성만으로 데이터를 발행하지 않고, subscribe() 를 호출했을 때 값을 발행한다는 이야기다.



-

Observable.create() 를 사용할 때는 주의해야 한다. 

1. Observable 이 구독 해지(dispose)되었을 때 등록된 콜백을 모두 해제해야 한다. 그렇지 않으면 memory leak 이 발생한다.

2. 구독하는 동안에만 onNext 와 onComplete 이벤트를 호출해야 한다.

3. 에러가 발생했을 때는 오직 onError 이벤트로만 에러를 전달해야 한다.

4. 배입(back pressure)를 직접 처리해야 한다.




2.1.4. fromArray() 함수


-

단일 데이터가 아닐 때는 fromXXX() 계열 함수를 사용한다.

RxJava 1.x 에서는 from() 과 fromCallable() 함수만 사용했다.

그런데 from() 함수를 배열, 반복자, 비동기 계산 등에 모두 사용하다 보니 모호함이 있었다.

따라서 RxJava 2 에서 from 함수를 세분화했다.



-

Integer[] arr = { 100, 200, 300 };
Observable<Integer> source = Observable.fromArray(arr);
source.subscribe(System.out::println);

Observable 이 generic 을 사용하기 떄문에 int[] 가 아닌 Integer[] 를 사용한다.




2.1.5. fromIterable() 함수


-

List<String> names = new ArrayList<>();
name.add(“Gamza”);
name.add(“Goguma”);

Observable<String> source = Observable.fromIterable(names);
source.subscribe(System.out::println);


-

Map 객체에 관한 Observable 클래스의 from() 함수는 없다.




2.1.6. fromCallable() 함수


-

Callable<String callable = () -> {
    Thread.sleep(1000);
    return “Hello Callable”;
};
Observable.fromCallable(callbale).subscribe(System.out::println);


2.1.9. fromFuture() 함수


-

Future<String> future = Executors.newSingleThreadExecutor().submit( () -> {
    Thread.sleep(1000);
    return “Hello Future”;
});
Observable.fromFuture(future).subscribe(System.out::println);



2.1.8. fromPublisher() 함수


-

Publisher 는 자바 9의 표준인 Flow API 의 일부이다.

Publisher<String> publisher = (Subscriber<? super String> s) -> {
    s.onNext(“Hello Observable.fromPublisher());
    s.onComplete();
};
Observable.fromPublisher(publisher).subscribe(system.out::println);



2.2. Single 클래스


-

Observable 클래스는 데이터를 무한하게 발행할 수 있지만, Single 클래스는 오직 1개의 데이터만 발행하도록 한정한다.

보통 결과가 유일한 서버 API 를 호출할 때 유용하게 사용할 수 있다.


중요한 것은 데이터 하나가 발행과 동시에 종료(onSuccess)된다는 것이다.

라이프 사이클 관점에서 onNext() 와 onComplete() 함수가 onSuccess() 함수로 통합된 것이다.

Single 클래스의 라이프 사이클 함수는 onSuccess(T value) 함수와 onError() 함수로 구성된다.




2.2.1. just() 함수


-

Single.just(“Hello Single”).subscribe(System.out::println);





2.2.2. Observable 에서 Single 클래스 사용


-

Single 은 Observable 의 특수한 형태이므로 Observable 에서 변환할 수 있다.


-

Observable<String> source = Observable.just(“Hello Single”);
Single.fromObservable(source).subscribe(System.out::println);

source.single(“default item”).subscribe(System.out::println);
// Observable 에서 값이 발행 안 되면 “default item” 이 발행된다.

String[] colors = {“Red”, “Blue”, “Green”};
Observable.fromArray(colors).first(“default value”).subscribe(System.out::println);

Observable.just(new Order(“ORD-1”), new Order(“ORD-2”))
    .take(1)
    .single(new Order(“default order”))
    .subscribe(System.out::println);



2.2.3. Single 클래스의 올바른 사용 방법


-

Observable.just(“Hello Single”, “Error”).single(“default item”).subscribe(System.out::println);

위의 코드를 실행하면 IllegalArgumentException: Sequence contains more than one element! 에러가 발생한다.


에러 메시지는 두 번째 값을 발행하면서 onNext 이벤트가 발생할 때 에러가 발생했다고 알려준다.





2.3. Maybe 클래스


-

Maybe 는 RxJava 2에 도입된 Observable 의 또 다른 특수 형태이다.

Single 클래스와 마찬가지로 최대 데이터 하나를 가질 수 있지만, 데이터 발행 없이 바로 데이터 발생을 완료할 수도 있다.

Maybe 클래스는 Single 클래스에 onComplete 이벤트가 추가된 형태이다.



-

Maybe 객체는 Maybe 클래스를 이용해 생성할 수 있지만 보통 Observable 의 특정 연산자를 통해 생성할 때가 많다.


Maybe 객체를 생성할 수 있는 리액티브 연산자에는 elementAt(), firstElement(), flatMapMaybe(), lastElement(), reduce(), singleElement() 함수 등이 있다.





2.4. 뜨거운 Observable


-

Observable 에는 Hot Observable 과 Cold Observable 이 있다.



-

Cold Observable 은 옵저버가 subscribe 함수를 호출하여 구독하지 않으면 데이터를 발행하지 않는다.

다른 말로 게으른(lazy) 접근법이다.



-

Hot Observable 은 구독자의 존재 여부와 관계없이 데이터를 발행하는 Observable 이다.

따라서 여러 구독자를 고려할 수 있다.

단, 구독자로서는 Observable 에서 발행하는 데이터를 처음부터 모두 수신할 것으로 보장할 수 없다.



-

Cold Observable 은 구독자가 구독하면 준비된 데이터를 처음부터 발행한다.

하지만 Hot Observable 은 구독한 시점에서 Observable 에서 발행한 값을 받는다.



-

Cold Observable 은 웹 요청, 데이터베이스 쿼리와 파일 읽기 등에 쓰인다.

보통 내가 원하는 URL 이나 데이터를 지정하면 그때부터 서버나 데이터베이스 서버에 요청을 보내고 결과를 받아온다.



-

Hot Observable 은 마우스 이벤트, 키보드 이벤트, 시스템 이벤트, 센서 데이터와 주식 가격 등이 있다.



-

Hot Observable 에는 주의할 점이 있다.

배압(Back pressure)을 고려해야 한다.

배압은 Observable 에서 데이터를 발행하는 속도와 구독자가 처리하는 속도의 차이가 클 때 발생한다.


RxJava 1.x 에서는 Observable 클래스에 별도의 배압 연산자들을 제공했지만, RxJava2 에서는 Flowable 이라는 특화 클래스에서 배압을 처리한다.





2.5. Subject 클래스


-

Subject 클래스는 Cold Observable 을 Hot Observable 로 바꿔준다.

Subject 클래스는 Observable 속성과 Subscriber 속성이 모두 있다.

Observable 처럼 데이터를 발행할 수도 있고, Subscriber 처럼 발행된 데이터를 바로 처리할 수도 있다.



-

AsyncSubject, BehaviorSubject, PublishSubject, ReplaySubject 등이 있다.




2.5.1. AsyncSubject 클래스


-

AsyncSubject 클래스는 Observable 에서 발행한 마지막 데이터를 얻어올 수 있는 Subject 클래스이다.

완료되기 전 마지막 데이터에만 관심이 있으며 이전 데이터는 무시한다.

구독자에게 데이터를 전달하지 않다가 완료됨과 동시에 구독자들에게 데이터를 발행하고 종료한다.



-

AsyncSubject<String> subject = AsyncSubject.create();
subject.subscribe( data -> System.out.println(“Subscriber #1 => “ + data));
subject.onNext(“1");
subject.onNext(“3”);
subject.subscribe( data -> System.out.println(“Subscriber #2 => “ + data));
subject.onNext(“5”);
subject.onComplete();


data 는 5만 전달된다.



-

Float[] temperature = {10.1f, 13.4f, 12.5f};
Observable<Float> source = Observable.fromArray(temperature);

AsyncSubject<Float> subject = AsyncSubject.create();
subject.subscribe( data -> System.out.println(“Subscriber #1 => “ + data) );
source.subscribe(subject);


12.5 만 찍힌다.






2.5.2. BehaviorSubject 클래스


-

BehaviorSubject 는 구독자가 구독을 하면 가장 최근 값 혹은 기본값을 넘겨주는 클래스이다.



-

BehaviorSubject<String> subject = BehaviorSubject.createDefault("6”);
subject.subscribe(data -> System.out.println(“Subs #1 => “ + data));
subject.onNext(“1”);
subject.onNext(“3”);
subject.subscribe(data -> System.out.println(“Subs #2 => “ + data));
subject.onNext(“5”);
subject.onComplete();

결과는

Subs #1 => 6

Subs #1 => 1

Subs #1 => 3

Subs #2 => 3

Subs #1 => 5

Subs #2 => 5




2.5.3. PublishSubject 클래스


-

가장 평범한 Subject 클래스이다.

구독자가 subscribe() 함수를 호출하면 값을 발행하기 시작한다.

해당 시간에 발생한 데이터를 그대로 구독자에게 전달한다.




2.5.4. ReplaySubject 클래스


-

가장 특이하고 사용할 때 주의해야 하는 녀석이다.

Subject 클래스의 목적은 Hot Observable 을 활용하는 것인데 Cold Observable 처럼 동작하기 떄문이다.



-

ReplaySubject 클래스는 구독자가 새로 생기면 항상 데이터의 처음부터 끝까지 발행하는 것을 보장해준다.

그러므로 모든 데이터 내용을 저장해두는 과정 중 메모리 누수가 발생할 가능성을 염두에 두고 사용할 때 주의해야 한다.



-

ReplaySubject<String> subject = ReplaySubject.create();
subject.subscribe(data -> System.out.println(“Subs #1 => “ + data));
subject.onNext(“1”);
subject.onNext(“3”);
subject.subscribe(data -> System.out.println(“Subs #2 => “ + data));
subject.onNext(“5”);
subject.onComplete();

결과는..

Subs #1 => 1

Subs #1 => 3

Subs #2 => 1

Subs #2 => 3

Subs #1 => 5

Subs #2 => 5





2.6. ConnectableObservable 클래스


-

ConnectableObservable 클래스는 Subject 클래스처럼 Cold Observable 을 Hot Observable 로 변환한다.

여러 구독자에게 데이터를 동시에 전달할 때 사용한다.

특이한 점은 subscribe() 함수를 호출해도 아무 동작이 일어나지 않는다.

새로 추가된 connect() 함수는 호출한 시점부터 subscribe() 함수를 호출한 구독자에게 데이터를 발행하기 때문이다.



-

ConnectableObservable 객체를 생성하려면 먼저 Observable 에 publish() 함수를 호출해야 한다.

이 함수는 여러 구독자에게 데이터를 발행하기 위해 connect() 함수를 호출하기 전까지 데이터 발행을 유예하는 역할을 한다.


Hot observable 답게 connect() 가 불렸어도, 새로운 subscriber 에게는 새로운 data 만 전달한다.



-

String[] dt = { “1”, “3”, “5” };
Observable<String> balls = Observable.interval(100L, TimeUnit.MILLISECONDS)
    .map(Long::intValue)
    .map(i -> dt[i])
    .take(dt.length);
ConnectableObservable<String> source = balls.publish();
source.subscribe(data -> System.out.println(“Subs #1 => “ + data));
source.subscribe(data -> System.out.println(“Subs #2 => “ + data));
source.connect(); // 이때부터 data 생성이 시작된다.

CommonUtils.sleep(250);
source.subscribe(data -> System.out.println(“Subs #3 => “ + data));
CommonUtils.sleep(100);

결과는..

Subs #1 => 1

Subs #2 => 1

Subs #1 => 3

Subs #2 => 3

Subs #1 => 5

Subs #2 => 5

Subs #3 => 5



-

interval() 함수는 테스트 코드를 작성할 때 많이 활용된다.





2.7. 마치며






반응형