Android Reactive Architecture — part 1: introduction

Arnaud Giuliani
AndroidPub
Published in
6 min readMay 24, 2017

--

Understanding what is a reactive architecture in the context of an Android application — First steps for building your android app with Reactive Streams (Kotlin inside !)

While debugging on one of my apps in production, I had to take the hand on some low level RxJava parts. Here is an opportunity to pause myself and share my understanding RxJava and its role for an Android application.

Why do we talk about reactive applications?

Reactive Architecture on client side ?

For a long time, I was considering the reactive manifesto as server side target only: server application has to be scalable, resilient, responsive, & based on message driven… For me, the image is quiet clear for a backend app. But for client side, the thing didn’t came up immediately.

Reactive Application Traits — From http://www.reactivemanifesto.org/

Ok let’s think again, but for client side:

  • Responsive — The system responds in a timely manner if at all possible
  • Resilient — The system stays responsive in the face of failure
  • Elastic — The system stays responsive under varying workload.
  • Message Driven — Rely on asynchronous message-passing to establish a boundary between components that ensures loose coupling, isolation and location transparency

Responsive and resilient can be easily understood in case of front applications (keep responsive UI and survive to errors). But the other traits are less natural to build… how to make my app being elastic? Use message driven every where?

The reactive streams API is a good way to solve our problematic. Let’s see below.

Using the Reactive Streams

Reactive streams is a standard, and must be seen as a set of tools, to build data exchanges between components in an asynchronous way:

The scope of Reactive Streams is to find a minimal set of interfaces, methods and protocols that will describe the necessary operations and entities to achieve the goal — asynchronous streams of data with non-blocking back pressure.

Such tools help us wire components and notify data & error (also called signals): this is our elastic & message driven aspect. This wiring describes processing, of the data flow with operations.

Asynchronous/Callback API are difficult to reuse and test (hard to test without waiting/rendez-vous, must manage threads…).The reactive streams results are entirely predictable: you only have to define your data flow and operations. You are abstracted from all the asynchronous and threading stuffs, and only care about data and your code.

The final result will be fired (data or error), whatever happens in the processing chain. All asynchronous stuff is managed by the reactive engine.

RxJava, get the basics

We will use here RxJava, as the reactive streams implementation for our Android (Project reactor is not Android friendly yet). Rx use the following terms:

  • Observerreceive signals
  • Observablepublish signals

In reactive streams specification, observer is named subscriber and observable is named publisher.

With such terms, we can easily make rapprochement with Observer / Observable design pattern. But it’s real more than that. It’s about API to build data exchanges in an asynchronous way.

You can add operators to your Observable for many purposes: data transformation, error handling, data buffering, timeout handle… Each stage is a separate unit of processing.

RxMarbles helps you to visualize operators. Here is an example of RxMarble for Map operator:

Map operation transforms incoming data into another one

Error handling is greatly simplified: it can be either treated as an operator (doOnError) or either treated in the observer (onError). This way you can anticipate things in a global way, and avoid to make things too locally for each stage.

Let’s take a simple case: a web client.

Writing our first example

Let’s write a simple HTTP Get with RxJava and Kotlin. We need to create 2 main blocks:

Encapsulate blocking HTTP request in Observable/Observer data flow

Observable lambda is the datasource. Here I’m making a simple HTTP request, wait for a result and fire it to observer. We emit a simple boolean to asset request success (with onSuccess or onError — See Single class).

The makeASimpleHTTPGet() method make a blocking OkHttp Call and return an OkHttp3 Response.

Observer, written with 2 lambdas functions, captures the data and errors (see Observer Class). We use here the clear lambda expressivity of Kotlin to embrace the RxJava API.

If I have any error appears in my chain (in data flow, in operators …), I will receive the error signal in my dedicated lambda function.

In our web example, we don’t need an operator to transform or manipulate data from our HTTP Request. But If you try to execute it in an Android Activity, you will have a NetworkOnMainThreadException error.

Background tasks & Android Lifecycle

First thing to take care when using RxJava with Android, is the Scheduler strategy. You cannot run anything on the main thread on Android. RxJava manages thread pool dispatching for you, but you must tell where to do. Schedulers are thread pools, to deal with your data streams & operations. You can set schedulers with 2 operators:

  • subscribeOn — Scheduler for Observable execution
  • observeOn — Scheduler for Observer execution

For Android, you will have to dispatch your Observable in background and notify result on main thread. You will have to use the RxAndroid library to be able to use AndroidSchedulers.mainThread() scheduler. Check below:

Blocking HTTP request, with schedulers

This way, the request is processed in background in one of Schedulers.computation’s thread and the final result is fired on Android’s main thread. See the documentation here for more details about schedulers.

Don’t forget to dispose your request, when you don’t need anymore:

var disposableRequest: Disposable? = Single ...
disposableRequest?.dispose()

You must conform to your components lifecycle. This mean that we usually dispose request in onStop methods from Activity & Fragment to avoid memory leaks or dirty behaviours.

AsyncTask & Service ?

We have several types of Observable: Single class is for dealing with single data. Observable is for data list. Check here the differences between Single and Observable.

You can replace AsyncTask with RxJava request like this:

Your computation is done in an Observable, notifying for data updates (onNext). Each data update is received in the first lambda. When processing is complete, you call the onComplete signal. This last is catched in the 3rd lambda. (With Single you don’t need to catch onComplete, because you have only one update)

Android Service can run Rx requests to surely keep it in background, but can’t be replaced at all.

That’s it. We discover Reactive Streams with RxJava to build a reactive Android application.

Next ?

For mobile developers, web services (and every remote services) should be considered as a source of problem: bad connectivity, response validation, errors, timeouts… Let’s dive into reactive web parts in my next article 🙂

--

--

Arnaud Giuliani
AndroidPub

Creator/Maintainer of Koin framework -- Kotlin Google Dev Expert -- Cofounder @ Kotzilla.io