Join two streams into one stream

Loading entities from database and server API by using RxJava

Hadi Lashkari Ghouchani
ProAndroidDev
6 min readOct 28, 2018

--

Your app may have some repositories to handle requests of database and server API, but did you ever had trouble to describe this process by using RxJava 2? I used “describe” because RxJava, as a functional programming tool, meant to be non-imperative, which makes it a little bit harder to manage, but after you achieve the solution you know your solution is more human readable, elegant and transportable to other platforms because you wrote it in the universal language of mathematics(Category theory, morphism, functor, monad and etc.) and you can see the beauty of it.

Old Way

In this manner, Mert Şimşek wrote a post here. The main function of his solution is this:

But in his scenario, the Observable become completed after the response of the network, which is not what we want as the Room database provides Flowables that doesn’t complete to deliver more updates of the data. Here we want to support this advantage of keeping the pipe open and delivering updated data.

The Scenario

We want to load data from database, then after the first response of the database we need to request to the API and in the end we need to keep the pipe open for more possible updates of the data. To achieving this we just need to avoid to call onComplete method of observers. Having this in mind and knowing that the stream from the database is not completing, we would always be emitting updates from the database, including when we fetch from API. In order to achieve that, when we get an API response, instead of propagating this update directly, we will persist it and let the database be responsible for the emit. Additionally, after the response of database, we need to know there is a work on progress, so UI shows a loading until the response of the API is coming back.

Our Way

First of all, we assume every streams are Flowables instead of Observables. This is a proper assumption because the Room database cannot emit an Observable, plus in this way we have the advantage of handling backpressure by Flowable. Second, for this scenario we need to define ResultState class as follows.

Here we assume the Loading state is emitted by the returning Flowable when the response of database is arrived, then the UI knows to keep showing a loading. After the response of server API is coming back, we persist it in the database and the database emits the new update, which we have to wrap it into a Success state, so UI knows the loading is finished. After that if by any unknown process the data in the database gets updated the returning Flowable emits Success state with updated data. While we do these processes if any error happened the Flowable emits Error state with the last available data, which was cached. We keep the last data in Error state because LiveData can cache just one value that emits through it and UI needs every data to restore its state.

The BaseRepository

In clean architecture of our app, each of repositories in the data layer is responsible for one Entity in the domain layer as follow.

You can see in this BaseRepository class that we have a method named perform which is responsible for loading data from database and network. But you can see in the arguments of perform method that database is giving us a list of entities, which in the first glance looks like a problem, but after reading Room and the Mystery of the Lost Events post you’ll understand that for avoiding lost of events we need to use list of entities for just one entity(tip: you can think of this list just like an Optional<T>, which unfortunately is just supported from Android API 24, so we cannot use it in our base class.)

The overall behaviour of this function is tested here, and we’ll come back to it soon.

The perform method starts with val cachedData = AtomicReference<D?>() line. As you know, here we have a Flowable and a Single in the arguments of this method, so we have to consider multi-threaded streams, then we use AtomicReference to support it. Also cachedData variable is responsible for caching the emitted value through the database Flowable.

The next line is

val processor = PublishProcessor.create<ResultState<D>>()

which we’ll use it as Flowable without an end. As we described the scenario that we want to support, we need to have a Flowable that never calls the onCompletemethod of its observers, so we’ll use the processor for this purpose.

val flowable = databaseFlowable.doOnNext {
cachedData.set(it.firstOrNull())
}.share()

On the above line we cache the emitted values of the database in cachedData, then we create a ConnectableFlowable by using the share operator, so this makes us sure in the two calls of Flowable.merge the lambda function of doOnNext would invoke only once. To understand share operator better you can see its related document here.

If we ignore the flatMaps and onErrorResumeNextmethod then the returning value of perform would be

return Flowable.merge(flowable.take(1), flowable.skip(1))

This line just divides the steam to the first emitted value and the rest of them, then after treating them differently, merge them to one stream again. Here are the documentation of merge, take and skip operators.

.onErrorResumeNext{ 
concatJustFlowable(ResultState.Error(...), processor)
}

Above line converts all the errors in this stream to ResultState.Error, so it makes us sure that onError of observers of the returning Flowable would never get called. It’s important for us because after calling onError the stream would get closed, which is bad as long as we want to emit the updates of data to the UI. Here we use the combination of .onErrorResumeNext{ concatJustFlowable(..., processor)} to exactly avoid the completion of the returning Flowable.

If database emits its first value then the following code is running.

if (it.isEmpty()) {
handleNetSingle(netSingle, persist, cachedData) } else {
concatJustFlowable(
ResultState.Loading(it.first()),
handleNetSingle(netSingle, persist, cachedData)
)
}

This part of the code just checks if the database doesn’t have the value then just request to the server API, without emitting the Loading state. Otherwise, if database found a value then emits Loading state and request to the server API for updated value.

However, for the reset of the stream we just send Success state by the following code, if the database is actually found an update value, so the list is not empty.

if (it.isNotEmpty()) {
concatJustFlowable(ResultState.Success(it.first()), processor)
} else {
processor
}

Also we have a method named concatJustFlowable which is responsible for emitting a value and then let a Flowable emits its values. The other method is handleNetSingle which is responsible for creating a Flowable from network Single and persist the data. In case of error, this method needs to wrap error with

.onErrorReturn {
ResultState.Error(it, cachedData)
}

We need to handle errors of network and API here to avoid handling errors with Flowable.merge method, which breaks its functionality.

Testing

As mentioned before, the test methods are here, so let’s briefly explain how they work. In most of these tests we have a database processor to mock the database behaviour like this

val database = PublishProcessor.create<List<EntityTest>>()

and also we have a subject to mimic the server API like this

val net = PublishSubject.create<EntityTest>()

Additionally, in most of these tests we have a persist function like this

val persist = { d: EntityTest -> database.onNext(listOf(d)) }

which is responsible to emit the persisted entity to the database Flowable, then after subscribing to the returning Flowable like this

val testSubscriber = usecaseFlowable.test()

we start to emit database and network values and test what’s happening. In most of cases we want to assert the Flowable is not competed or propagated a Throwable.

Finally, I hope with this scenario you and users of your app feel better and have better experiences. In the end, I want to thank my team, including Reza Bigdeli, Dr. jacky and Elmira farahani, for their endeavour to achieve this result.

--

--