본문 바로가기
프로그래밍 놀이터/Kotlin, Coroutine

[coroutine] Flow vs RxJava

by 돼지왕 왕돼지 2021. 5. 10.
반응형

 

-

이 글은 RxJava 는 써보고, Flow 는 안 써본 개발자 입장에서 정리한 글.

 

 

 

Core functionality

 

-

view state 와 effects 를 만들고 업데이트 하기 위해 coroutine 1.3.6 부터는 StateFlow 와 MutableStateFlow 를 제공한다.

 

 

 

Retrofit

 

-

// Flow
interface FeedService {
    @GET("lists/{listType}")
    suspend fun getTweets( // suspend 를 붙여준다.
        @Path(LIST_TYPE_PATH listType:String, 
        @Query(LIST_ID_QUERY) listId:String,
        @Query(LIST_COUNT_QUERY) count:String,
        @Query(LIST_PAGE_NUM_QUERY) page:String
    ) : List<Tweet> // 바로 List 로 받을 수 있다.
}

 

 

-

// Rx
interface FeedService {
    @GET("lists/{listType}")
    fun getTweets( // suspend 가 없다.
        @Path(LIST_TYPE_PATH listType:String, 
        @Query(LIST_ID_QUERY) listId:String,
        @Query(LIST_COUNT_QUERY) count:String,
        @Query(LIST_PAGE_NUM_QUERY) page:String
    ) : Observable<List<Tweet>> // Observable 의 형태로 받는다.
}

 

 

 

Backpressure

 

-

Backpressure 는 지속적으로 입력되는 데이터 양이 많은 경우, 이를 처리하는데 시간이 오래 걸려 UI 가 느려지거나, 앱이 죽거나, 중요한 정보가 상실되는가 등에 대한 대응을 필요로 함을 이야기한다.

 

 

-

Flow 는 기본적으로 sequence emit 을 보장한다. (Rx 의 BUFFER 와 같다) 추가적으로 buffer, conflate, collectLatest 등으로 backpressure 를 다루는 방법을 제공한다.

 

buffer 는 source data 를 병렬적으로 처리한다. 상황에 따라 몇 개는 preprocess 되어 기다리고 있을 수 있다. 최종 결과는 순서가 보장된다.

conflate 는 제 때 처리하지 못한 data 들 중 마지막 값을 emit 해준다. Rx 의 LATEST 와 동일하다.

collectLatest 는 새로운 데이터가 emit 될 때 느린 collector 를 취소시키거나 재시작시킨다. (suspend function 이 cancel 되면서 바로 다음 값을 처리한다고 보면 되겠다.)

참고 : https://aroundck.tistory.com/6670

 

 

-

Rx 는 Flowable 로 backpressure 를 처리한다.

BackpressureStrategy.BUFFER 는 subscriber 가 consume 할 때까지 기다렸다가 emit 해준다. (coroutine 의 기본과 같다.)

BackpressureStrategy.DROP 은 제 때 처리되지 못하는 data 를 emit 단계에서 drop 시킨다. 즉 subscriber 가 다음 값을 request 하는 시점에 값이 없을 수 있다.

BackpressureStrategy.LATEST 는 제 때 처리되지 못한 data 의 마지막 값을 emit 해준다. Flow 의 conflate 와 동일하다.

참고 : https://aroundck.tistory.com/6243

 

 

 

Process Data

 

-

Flow 가 init 될 때 launchIn 을 통해 lifecycle 이 정의된다.

viewModelScope 는 기본적으로 Dispatchers.Main.immediate thread 를 사용하며, android ViewModel 과 생명주기를 함께 한다.

그래서 main thread 에서 view state 를 return 하는 것이면 충분하다.

feedRepo.initFeed().onEach { results ->
    when (results.status) {
        LOADING -> // do sth..
        SUCCESS -> // do sth with results.data
        ERROR -> // do sth..
    }
}.launchIn(viewModelScope)

 

 

-

Dispatchers.Main 은 launch 등을 수행했을 대 다음 Main thread 의 cycle 에 로직을 수행한다면, Dispatchers.Main.immediate 는 Main 일 경우 바로 로직을 수행시킨다.

 

 

-

Rx 의 경우는 CompositeDisposable 등을 만들어서 그곳에서 관리를 해주어야 한다.

private val disposables = CompositeDisposable()

private fun initFeed(){
    val disposable = feedRepo.initFeed()
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe { results ->
            when(results.status) {
                LOADING -> // do sth..
                SUCCESS -> // do wth with results.data
                ERROR -> // do sth..
            }
        }
    disposables.add(disposable)
}

private clearDisposables(){
    disposables.clear()
}

 

 

 

View states - States + effects

 

-

View states 는 최종적으로 보여줄 data 를 말하고, View effects 는 error 등의 일회적인 성격의 상황을 이야기한다.

 

 

-

Flow 를 이용하면 다음과 같이 할 수 있다.

// MyViewModel.kt
data class _FeedViewState(
    val _feed:MutableStateFlow<List<Tweet>?> = MutableStateFlow(null)
)

// MutableStateFlow 가 아닌 Flow 를 전달하기 위함용인듯..
data class FeedViewState(private val _viewState:_FeedViewState){
    val feed:Flow<List<Tweet>?> = _viewState._feed.filterNotNull()
}

private val _viewState = _FeedViewState()
val viewState = FeedViewState(_viewState)

init{
    initFeed()
}

private fun initFeed(){
    feedRepo.initFeed().onEach{ results ->
        when (results.status){
            ...
            SUCCESS -> {
                ...
                viewState.feed.value = results.data
            }
            ...
        }
    }
    .flowOn(Dispatchers.IO)
    .launchIn(viewModelScope)
}

// MyView.kt
private fun initViewStates(){
    viewModel.viewState.feed.onEach { list ->
        adapter.submitList(list)
    }.launchIn(lifecycleScope)
}

 

 

-

참고 자료 : https://proandroiddev.com/udf-flowvsrx-a792b946d75c

 

끝!

 

 

 

반응형

댓글