-
이 글은 RxJava 는 써보고, Flow 는 안 써본 개발자 입장에서 정리한 글.
Core functionality
-
view state 와 effects 를 만들고 업데이트 하기 위해 coroutine 1.3.6 부터는 StateFlow 와 MutableStateFlow 를 제공한다.
Retrofit
-
// Flow
interface FeedService {
@GET("lists/{listType}")
suspend fun getTweets( // suspend 를 붙여준다.
@Path(LIST_TYPE_PATH listType:String,
@Query(LIST_ID_QUERY) listId:String,
@Query(LIST_COUNT_QUERY) count:String,
@Query(LIST_PAGE_NUM_QUERY) page:String
) : List<Tweet> // 바로 List 로 받을 수 있다.
}
-
// Rx
interface FeedService {
@GET("lists/{listType}")
fun getTweets( // suspend 가 없다.
@Path(LIST_TYPE_PATH listType:String,
@Query(LIST_ID_QUERY) listId:String,
@Query(LIST_COUNT_QUERY) count:String,
@Query(LIST_PAGE_NUM_QUERY) page:String
) : Observable<List<Tweet>> // Observable 의 형태로 받는다.
}
Backpressure
-
Backpressure 는 지속적으로 입력되는 데이터 양이 많은 경우, 이를 처리하는데 시간이 오래 걸려 UI 가 느려지거나, 앱이 죽거나, 중요한 정보가 상실되는가 등에 대한 대응을 필요로 함을 이야기한다.
-
Flow 는 기본적으로 sequence emit 을 보장한다. (Rx 의 BUFFER 와 같다) 추가적으로 buffer, conflate, collectLatest 등으로 backpressure 를 다루는 방법을 제공한다.
buffer 는 source data 를 병렬적으로 처리한다. 상황에 따라 몇 개는 preprocess 되어 기다리고 있을 수 있다. 최종 결과는 순서가 보장된다.
conflate 는 제 때 처리하지 못한 data 들 중 마지막 값을 emit 해준다. Rx 의 LATEST 와 동일하다.
collectLatest 는 새로운 데이터가 emit 될 때 느린 collector 를 취소시키거나 재시작시킨다. (suspend function 이 cancel 되면서 바로 다음 값을 처리한다고 보면 되겠다.)
참고 : https://aroundck.tistory.com/6670
-
Rx 는 Flowable 로 backpressure 를 처리한다.
BackpressureStrategy.BUFFER 는 subscriber 가 consume 할 때까지 기다렸다가 emit 해준다. (coroutine 의 기본과 같다.)
BackpressureStrategy.DROP 은 제 때 처리되지 못하는 data 를 emit 단계에서 drop 시킨다. 즉 subscriber 가 다음 값을 request 하는 시점에 값이 없을 수 있다.
BackpressureStrategy.LATEST 는 제 때 처리되지 못한 data 의 마지막 값을 emit 해준다. Flow 의 conflate 와 동일하다.
참고 : https://aroundck.tistory.com/6243
Process Data
-
Flow 가 init 될 때 launchIn 을 통해 lifecycle 이 정의된다.
viewModelScope 는 기본적으로 Dispatchers.Main.immediate thread 를 사용하며, android ViewModel 과 생명주기를 함께 한다.
그래서 main thread 에서 view state 를 return 하는 것이면 충분하다.
feedRepo.initFeed().onEach { results ->
when (results.status) {
LOADING -> // do sth..
SUCCESS -> // do sth with results.data
ERROR -> // do sth..
}
}.launchIn(viewModelScope)
-
Dispatchers.Main 은 launch 등을 수행했을 대 다음 Main thread 의 cycle 에 로직을 수행한다면, Dispatchers.Main.immediate 는 Main 일 경우 바로 로직을 수행시킨다.
-
Rx 의 경우는 CompositeDisposable 등을 만들어서 그곳에서 관리를 해주어야 한다.
private val disposables = CompositeDisposable()
private fun initFeed(){
val disposable = feedRepo.initFeed()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { results ->
when(results.status) {
LOADING -> // do sth..
SUCCESS -> // do wth with results.data
ERROR -> // do sth..
}
}
disposables.add(disposable)
}
private clearDisposables(){
disposables.clear()
}
View states - States + effects
-
View states 는 최종적으로 보여줄 data 를 말하고, View effects 는 error 등의 일회적인 성격의 상황을 이야기한다.
-
Flow 를 이용하면 다음과 같이 할 수 있다.
// MyViewModel.kt
data class _FeedViewState(
val _feed:MutableStateFlow<List<Tweet>?> = MutableStateFlow(null)
)
// MutableStateFlow 가 아닌 Flow 를 전달하기 위함용인듯..
data class FeedViewState(private val _viewState:_FeedViewState){
val feed:Flow<List<Tweet>?> = _viewState._feed.filterNotNull()
}
private val _viewState = _FeedViewState()
val viewState = FeedViewState(_viewState)
init{
initFeed()
}
private fun initFeed(){
feedRepo.initFeed().onEach{ results ->
when (results.status){
...
SUCCESS -> {
...
viewState.feed.value = results.data
}
...
}
}
.flowOn(Dispatchers.IO)
.launchIn(viewModelScope)
}
// MyView.kt
private fun initViewStates(){
viewModel.viewState.feed.onEach { list ->
adapter.submitList(list)
}.launchIn(lifecycleScope)
}
-
참고 자료 : https://proandroiddev.com/udf-flowvsrx-a792b946d75c
끝!
'프로그래밍 놀이터 > Kotlin, Coroutine' 카테고리의 다른 글
[Coroutine] Exception handling in coroutine (0) | 2022.01.30 |
---|---|
[Coroutine] Coroutine scope functions (0) | 2022.01.29 |
[coroutine] SharedFlow & StateFlow (0) | 2021.05.08 |
[kotlin] LazyThreadSafeMode SYNCHRONIZED vs. PUBLICATION (0) | 2021.01.28 |
[Kotlin] Coroutine 소개 (0) | 2020.08.24 |
댓글