Observable 은 데이터 흐름에 맞게 알림을 보내 구독자가 데이터를 처리할 수 있도록 한다.
RxJava 프로그래밍은 Observable 에서 시작해 그것으로 끝난대고 해도 과언이 아닐 정도로 중요한 개념이다.
-
Rajava 1.x 에서는 데이터 소스를 Observable 과 Single 클래스로 구성했다.
RxJava 2 에서는 Observable 클래스를 상황에 맞게 세분화해 각각 Observable, Maybe, Flowable 로 나뉘었고, Single 도 그대로 존재한다.
-
Maybe 클래스는 Reduce() 함수나 firstElement() 함수와 같이 데이터가 발행될 수 있거나 혹은 발행되지 않고도 완료되는 경우를 의미한다.
-
Flowable 클래스는 Observable 에서 데이터가 발행되는 속도가 구독자가 처리하는 속도보다 현저하게 빠른 경우 발생하는 배압(Back pressure)이슈에 대응하는 기능을 추가로 제공한다.
2.1. Observable 클래스
-
Observable 은 옵저버 패턴을 구현한다.
옵저버 패턴은 객체의 상태 변화를 관찰하는 관찰자(옵저버) 목록을 객체에 등록한다.
그리고 상태 변화가 있을 때마다 메서드를 호출하여 객체가 직접 목록의 각 옵저버에게 변화를 알려준다.
라이프 사이클은 존재하지 않고 보통 단일 함수를 통해 변화만 알린다.
-
RxJava 의 Observable 은 세 가지 알림을 구독자에게 전달한다.
onNext: 데이터의 발행을 알린다. 기존의 옵저버 패턴과 같다.
onComplete: 모든 데이터의 발행을 완료했음을 알린다. 단 한번만 발생하며, 발생한 후에는 더 이상 onNext 이벤트가 발생해서는 안 된다.
onError : 어떤 이유로 에러가 발생했음을 알린다. 이후에 onNext 및 onComplete 이벤트가 발생하지 않는다. 즉, Observable 의 실행을 종료한다.
-
Observable 클래스에는 Observable 을 생성하는 팩토리 함수, 중간 결과를 처리하는 함수, 디버그 및 예외 처리 함수가 모두 포함되어 있다.
-
1.x 부터 있던 팩토리 메소드 : create(), just(), from()
2.x 추가 팩토리 메소드 : fromArray(), fromIterable(), fromCallable(), fromFuture(), fromPublisher()
기타 팩토리 메소드 : interval(), range(), timer(), defer() 등
2.1.1. just() 함수
-
just() 함수는 인자로 넣은 데이터를 차례로 발행하는 Observable 을 생성한다.
실제 데이터의 발행은 subscribe() 함수를 호출해야 시작한다.
한 개의 값을 넣을 수도 있고 인자로 여러개의 값(최대 10개)을 넣을 수도 있다.
단, 타입은 모두 같아야 한다.
2.1.2. subscribe() 함수와 Disposable 객체
-
RxJava 는 선언형 프로그래밍을 지향한다.
선언형 프로그래밍은 명령형 프로그래밍(Imperative) 의 반대말로 어떤 방법(how)으로 동작하는지가 아니라 프로그래밍할 대상이 무엇(what)인지 알려주는 것을 의미한다.
예를 들어 명령형에서는 실행할 알고리즘과 동작을 구체적으로 명시하지만, 선언형은 목표를 명시할 뿐 실행할 알고리즘을 명시하지 않는다.
-
Disposable subscribe()
onNext() 와 onComplete() 이벤트를 무시하고 onError 이벤트가 발생했을 때만 OnErrorNotImplementedException 을 던진다. 따라서 Observable 로 작성한 코드를 테스트하거나 디버깅할 떄 활용한다.
Disposable subscribe(Consumer<? super T> onNext)
onNext 이벤트를 처리한다. onError 이벤트가 발생하면 OnErrorNotImplementedException 을 던진다.
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
onNext 와 onError 이벤트를 처리한다.
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete)
onNext, onError, onComplete 모두 처리할 수 있다.
-
Disposable 은 RxJava 1.x 의 Subscription(구독) 객체에 해당한다.
dispose(), isDisposed() 두개의 함수만 지원한다.
dispose() 는 구독을 해지하는 함수이다.
Observable 계약에 따르면 Observable이 onComplete 알림을 보냈을 때 자동으로 dispose() 를 호출해 Observable 과 구독자의 관계를 끊는다.
2.1.3. create() 함수
-
just() 함수는 데이터를 인자로 넣으면 자동으로 알림 이벤트가 발생하지만, create() 함수는 onNext, onComplete, onError 같은 알림을 개발자가 직접 호출해야 한다.
데이터를 발행할 때 onNext() 를 호출해주어야 하며, 해야 한다.
-
Observable<T> create(ObservableOnSubscribe<T> source) public interface ObservableOnSubscribe<T>{ void subscribe(ObservableEmitter<T> e) throws Exception; }
Observable<Integer> source = Observable.create( (ObservableEmitter<Integer> emitter) -> { emitter.onNext(100); emitter.onNext(200); emitter.onNext(300); emitter.onComplete(); }); source.subscribe(System.out::println);
-
create 로 만드는 Observable 은 cold observable 이다.
이는 observable 의 생성만으로 데이터를 발행하지 않고, subscribe() 를 호출했을 때 값을 발행한다는 이야기다.
-
Observable.create() 를 사용할 때는 주의해야 한다.
1. Observable 이 구독 해지(dispose)되었을 때 등록된 콜백을 모두 해제해야 한다. 그렇지 않으면 memory leak 이 발생한다.
2. 구독하는 동안에만 onNext 와 onComplete 이벤트를 호출해야 한다.
3. 에러가 발생했을 때는 오직 onError 이벤트로만 에러를 전달해야 한다.
4. 배입(back pressure)를 직접 처리해야 한다.
2.1.4. fromArray() 함수
-
단일 데이터가 아닐 때는 fromXXX() 계열 함수를 사용한다.
RxJava 1.x 에서는 from() 과 fromCallable() 함수만 사용했다.
그런데 from() 함수를 배열, 반복자, 비동기 계산 등에 모두 사용하다 보니 모호함이 있었다.
따라서 RxJava 2 에서 from 함수를 세분화했다.
-
Integer[] arr = { 100, 200, 300 }; Observable<Integer> source = Observable.fromArray(arr); source.subscribe(System.out::println);
Observable 이 generic 을 사용하기 떄문에 int[] 가 아닌 Integer[] 를 사용한다.
2.1.5. fromIterable() 함수
-
List<String> names = new ArrayList<>(); name.add(“Gamza”); name.add(“Goguma”); Observable<String> source = Observable.fromIterable(names); source.subscribe(System.out::println);
-
Map 객체에 관한 Observable 클래스의 from() 함수는 없다.
2.1.6. fromCallable() 함수
-
Callable<String callable = () -> { Thread.sleep(1000); return “Hello Callable”; }; Observable.fromCallable(callbale).subscribe(System.out::println);
2.1.9. fromFuture() 함수
-
Future<String> future = Executors.newSingleThreadExecutor().submit( () -> { Thread.sleep(1000); return “Hello Future”; }); Observable.fromFuture(future).subscribe(System.out::println);
2.1.8. fromPublisher() 함수
-
Publisher 는 자바 9의 표준인 Flow API 의 일부이다.
Publisher<String> publisher = (Subscriber<? super String> s) -> { s.onNext(“Hello Observable.fromPublisher()); s.onComplete(); }; Observable.fromPublisher(publisher).subscribe(system.out::println);
2.2. Single 클래스
-
Observable 클래스는 데이터를 무한하게 발행할 수 있지만, Single 클래스는 오직 1개의 데이터만 발행하도록 한정한다.
보통 결과가 유일한 서버 API 를 호출할 때 유용하게 사용할 수 있다.
중요한 것은 데이터 하나가 발행과 동시에 종료(onSuccess)된다는 것이다.
라이프 사이클 관점에서 onNext() 와 onComplete() 함수가 onSuccess() 함수로 통합된 것이다.
Single 클래스의 라이프 사이클 함수는 onSuccess(T value) 함수와 onError() 함수로 구성된다.
2.2.1. just() 함수
-
Single.just(“Hello Single”).subscribe(System.out::println);
2.2.2. Observable 에서 Single 클래스 사용
-
Single 은 Observable 의 특수한 형태이므로 Observable 에서 변환할 수 있다.
-
Observable<String> source = Observable.just(“Hello Single”); Single.fromObservable(source).subscribe(System.out::println); source.single(“default item”).subscribe(System.out::println); // Observable 에서 값이 발행 안 되면 “default item” 이 발행된다. String[] colors = {“Red”, “Blue”, “Green”}; Observable.fromArray(colors).first(“default value”).subscribe(System.out::println); Observable.just(new Order(“ORD-1”), new Order(“ORD-2”)) .take(1) .single(new Order(“default order”)) .subscribe(System.out::println);
2.2.3. Single 클래스의 올바른 사용 방법
-
Observable.just(“Hello Single”, “Error”).single(“default item”).subscribe(System.out::println);
위의 코드를 실행하면 IllegalArgumentException: Sequence contains more than one element! 에러가 발생한다.
에러 메시지는 두 번째 값을 발행하면서 onNext 이벤트가 발생할 때 에러가 발생했다고 알려준다.
2.3. Maybe 클래스
-
Maybe 는 RxJava 2에 도입된 Observable 의 또 다른 특수 형태이다.
Single 클래스와 마찬가지로 최대 데이터 하나를 가질 수 있지만, 데이터 발행 없이 바로 데이터 발생을 완료할 수도 있다.
Maybe 클래스는 Single 클래스에 onComplete 이벤트가 추가된 형태이다.
-
Maybe 객체는 Maybe 클래스를 이용해 생성할 수 있지만 보통 Observable 의 특정 연산자를 통해 생성할 때가 많다.
Maybe 객체를 생성할 수 있는 리액티브 연산자에는 elementAt(), firstElement(), flatMapMaybe(), lastElement(), reduce(), singleElement() 함수 등이 있다.
2.4. 뜨거운 Observable
-
Observable 에는 Hot Observable 과 Cold Observable 이 있다.
-
Cold Observable 은 옵저버가 subscribe 함수를 호출하여 구독하지 않으면 데이터를 발행하지 않는다.
다른 말로 게으른(lazy) 접근법이다.
-
Hot Observable 은 구독자의 존재 여부와 관계없이 데이터를 발행하는 Observable 이다.
따라서 여러 구독자를 고려할 수 있다.
단, 구독자로서는 Observable 에서 발행하는 데이터를 처음부터 모두 수신할 것으로 보장할 수 없다.
-
Cold Observable 은 구독자가 구독하면 준비된 데이터를 처음부터 발행한다.
하지만 Hot Observable 은 구독한 시점에서 Observable 에서 발행한 값을 받는다.
-
Cold Observable 은 웹 요청, 데이터베이스 쿼리와 파일 읽기 등에 쓰인다.
보통 내가 원하는 URL 이나 데이터를 지정하면 그때부터 서버나 데이터베이스 서버에 요청을 보내고 결과를 받아온다.
-
Hot Observable 은 마우스 이벤트, 키보드 이벤트, 시스템 이벤트, 센서 데이터와 주식 가격 등이 있다.
-
Hot Observable 에는 주의할 점이 있다.
배압(Back pressure)을 고려해야 한다.
배압은 Observable 에서 데이터를 발행하는 속도와 구독자가 처리하는 속도의 차이가 클 때 발생한다.
RxJava 1.x 에서는 Observable 클래스에 별도의 배압 연산자들을 제공했지만, RxJava2 에서는 Flowable 이라는 특화 클래스에서 배압을 처리한다.
2.5. Subject 클래스
-
Subject 클래스는 Cold Observable 을 Hot Observable 로 바꿔준다.
Subject 클래스는 Observable 속성과 Subscriber 속성이 모두 있다.
Observable 처럼 데이터를 발행할 수도 있고, Subscriber 처럼 발행된 데이터를 바로 처리할 수도 있다.
-
AsyncSubject, BehaviorSubject, PublishSubject, ReplaySubject 등이 있다.
2.5.1. AsyncSubject 클래스
-
AsyncSubject 클래스는 Observable 에서 발행한 마지막 데이터를 얻어올 수 있는 Subject 클래스이다.
완료되기 전 마지막 데이터에만 관심이 있으며 이전 데이터는 무시한다.
구독자에게 데이터를 전달하지 않다가 완료됨과 동시에 구독자들에게 데이터를 발행하고 종료한다.
-
AsyncSubject<String> subject = AsyncSubject.create(); subject.subscribe( data -> System.out.println(“Subscriber #1 => “ + data)); subject.onNext(“1"); subject.onNext(“3”); subject.subscribe( data -> System.out.println(“Subscriber #2 => “ + data)); subject.onNext(“5”); subject.onComplete();
data 는 5만 전달된다.
-
Float[] temperature = {10.1f, 13.4f, 12.5f}; Observable<Float> source = Observable.fromArray(temperature); AsyncSubject<Float> subject = AsyncSubject.create(); subject.subscribe( data -> System.out.println(“Subscriber #1 => “ + data) ); source.subscribe(subject);
12.5 만 찍힌다.
2.5.2. BehaviorSubject 클래스
-
BehaviorSubject 는 구독자가 구독을 하면 가장 최근 값 혹은 기본값을 넘겨주는 클래스이다.
-
BehaviorSubject<String> subject = BehaviorSubject.createDefault("6”); subject.subscribe(data -> System.out.println(“Subs #1 => “ + data)); subject.onNext(“1”); subject.onNext(“3”); subject.subscribe(data -> System.out.println(“Subs #2 => “ + data)); subject.onNext(“5”); subject.onComplete();
Subs #1 => 6
Subs #1 => 1
Subs #1 => 3
Subs #2 => 3
Subs #1 => 5
Subs #2 => 5
2.5.3. PublishSubject 클래스
-
가장 평범한 Subject 클래스이다.
구독자가 subscribe() 함수를 호출하면 값을 발행하기 시작한다.
해당 시간에 발생한 데이터를 그대로 구독자에게 전달한다.
2.5.4. ReplaySubject 클래스
-
가장 특이하고 사용할 때 주의해야 하는 녀석이다.
Subject 클래스의 목적은 Hot Observable 을 활용하는 것인데 Cold Observable 처럼 동작하기 떄문이다.
-
ReplaySubject 클래스는 구독자가 새로 생기면 항상 데이터의 처음부터 끝까지 발행하는 것을 보장해준다.
그러므로 모든 데이터 내용을 저장해두는 과정 중 메모리 누수가 발생할 가능성을 염두에 두고 사용할 때 주의해야 한다.
-
ReplaySubject<String> subject = ReplaySubject.create(); subject.subscribe(data -> System.out.println(“Subs #1 => “ + data)); subject.onNext(“1”); subject.onNext(“3”); subject.subscribe(data -> System.out.println(“Subs #2 => “ + data)); subject.onNext(“5”); subject.onComplete();
결과는..
Subs #1 => 1
Subs #1 => 3
Subs #2 => 1
Subs #2 => 3
Subs #1 => 5
Subs #2 => 5
2.6. ConnectableObservable 클래스
-
ConnectableObservable 클래스는 Subject 클래스처럼 Cold Observable 을 Hot Observable 로 변환한다.
여러 구독자에게 데이터를 동시에 전달할 때 사용한다.
특이한 점은 subscribe() 함수를 호출해도 아무 동작이 일어나지 않는다.
새로 추가된 connect() 함수는 호출한 시점부터 subscribe() 함수를 호출한 구독자에게 데이터를 발행하기 때문이다.
-
ConnectableObservable 객체를 생성하려면 먼저 Observable 에 publish() 함수를 호출해야 한다.
이 함수는 여러 구독자에게 데이터를 발행하기 위해 connect() 함수를 호출하기 전까지 데이터 발행을 유예하는 역할을 한다.
Hot observable 답게 connect() 가 불렸어도, 새로운 subscriber 에게는 새로운 data 만 전달한다.
-
String[] dt = { “1”, “3”, “5” }; Observable<String> balls = Observable.interval(100L, TimeUnit.MILLISECONDS) .map(Long::intValue) .map(i -> dt[i]) .take(dt.length); ConnectableObservable<String> source = balls.publish(); source.subscribe(data -> System.out.println(“Subs #1 => “ + data)); source.subscribe(data -> System.out.println(“Subs #2 => “ + data)); source.connect(); // 이때부터 data 생성이 시작된다. CommonUtils.sleep(250); source.subscribe(data -> System.out.println(“Subs #3 => “ + data)); CommonUtils.sleep(100);
결과는..
Subs #1 => 1
Subs #2 => 1
Subs #1 => 3
Subs #2 => 3
Subs #1 => 5
Subs #2 => 5
Subs #3 => 5
-
interval() 함수는 테스트 코드를 작성할 때 많이 활용된다.
2.7. 마치며
'프로그래밍 놀이터 > 안드로이드, Java' 카테고리의 다른 글
[RxJava] #4 리액티브 연산자의 활용 (0) | 2019.06.05 |
---|---|
[RxJava] #3 리액티브 연산자 입문 (0) | 2019.06.04 |
[RxJava] #1 리액티브 프로그래밍 소개 (0) | 2019.05.31 |
[android] Logcat 에서 로그를 제대로 찍지 않아요! (4) | 2019.05.24 |
[android] dialog style 속성 (0) | 2019.04.27 |
댓글